Chained futures and rx-java overview

MODDATAIMP-19 - Getting issue details... STATUS

MODLOGIN-73 - Getting issue details... STATUS


The goal of investigation was to find a way to chain async operations avoiding nesting the callbacks. For example, if we consider a case in which we would like to adopt a specific pet, the flow would be something like that:

find pet by id → vacate shelter place (delete pet by id in the shelter) → take the pet home (save the pet among adopted)

We also want to make sure that in case something goes wrong on the stage of adoption our pet does not end up on the street, so we want to secure it's place back in the shelter. Therefore, it is important to execute this operation in a transaction.

Applying the traditional approach of nesting the callbacks the implementation of the mentioned case would look like that:

@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      pgClient.startTx(connection -> {
        try {
          Criteria idCrit = constructCriteria("'id'", id);
          pgClient.get(connection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false, getReply -> {
            if (getReply.failed()) {
              asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
            } else if (getReply.result().getResults().isEmpty()) {
              asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
            } else {
              List<Pet> pets = getReply.result().getResults();
              Pet petForAdoption = new Pet();
              petForAdoption.setGenus(pets.get(0).getGenus());
              petForAdoption.setQuantity(pets.get(0).getQuantity());
              try {
                pgClient.delete(connection, HOMELESS_PETS_TABLE_NAME, new Criterion(idCrit), deleteReply -> {
                  if (deleteReply.failed()) {
                    pgClient.rollbackTx(connection, rollback -> {
                      asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                    });
                  } else if (deleteReply.result().getUpdated() == 0) {
                    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
                  } else {
                    try {
                      pgClient.save(connection, ADOPTED_PETS_TABLE_NAME, petForAdoption, postReply -> {
                        if (postReply.failed()) {
                          pgClient.rollbackTx(connection, rollback -> {
                            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                          });
                        } else {
                          petForAdoption.setId(postReply.result());
                          pgClient.endTx(connection, done -> {
                            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond201WithApplicationJson(petForAdoption)));
                          });
                        }
                      });
                    } catch (Exception e) {
                      pgClient.rollbackTx(connection, rollback -> {
                        asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                      });
                    }
                  }
                });
              } catch (Exception e) {
                asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
              }
            }
          });
        } catch (Exception e) {
          asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
        }
      });
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

Even with excluded logging the code is hard to read and follow. Such code with growing complexity would result in an incomprehensive sequence of nested callbacks often called be the developers a 'pyramid of doom' or 'callback hell'. There are two approaches that can solve this issue - the usage of composed futures or rx-java.

Chained futures

Vertx futures support sequential composition allowing to chain async operations. More info on the subject can be found by the link. For example, the code above can be refactored as following:

@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      Pet entity = new Pet();
      entity.setId(id);
      PgTransaction<Pet> pgTransaction = new PgTransaction<>(entity);
      Future.succeededFuture(pgTransaction)
        .compose(this::startTx)
        .compose(this::findPet)
        .compose(this::vacateShelterPlace)
        .compose(this::adoptPet)
        .compose(this::endTx)
        .setHandler(res -> {
          if (res.failed()) {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
          } else if (res.result().entity == null) {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
          } else {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond201WithApplicationJson(res.result().entity)));
          }
        });
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

private Future<PgTransaction<Pet>> startTx(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  pgClient.startTx(sqlConnection -> {
    tx.sqlConnection = sqlConnection;
    future.complete(tx);
  });
  return future;
}

private Future<PgTransaction<Pet>> findPet(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
    pgClient.get(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false, reply -> {
      if (reply.failed()) {
        future.fail(reply.cause());
      } else if (reply.result().getResults().isEmpty()) {
        tx.entity = null;
        future.complete(tx);
      } else {
        List<Pet> pets = reply.result().getResults();
        tx.entity.setGenus(pets.get(0).getGenus());
        tx.entity.setQuantity(pets.get(0).getQuantity());
        future.complete(tx);
      }
    });
  } catch (Exception e) {
    future.fail(e);
  }
  return future;
}

private Future<PgTransaction<Pet>> vacateShelterPlace(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    if (tx.entity != null) {
      Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
      pgClient.delete(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, new Criterion(idCrit), reply -> {
        if (reply.succeeded()) {
          future.complete(tx);
        } else {
          pgClient.rollbackTx(tx.sqlConnection, res -> future.fail(reply.cause()));
        }
      });
    } else {
      future.complete(tx);
    }
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection, reply -> future.fail(e));
  }
  return future;
}

private Future<PgTransaction<Pet>> adoptPet(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    if (tx.entity != null) {
      Pet entity = new Pet();
      entity.setGenus(tx.entity.getGenus());
      entity.setQuantity(tx.entity.getQuantity());
      pgClient.save(tx.sqlConnection, ADOPTED_PETS_TABLE_NAME, entity, postReply -> {
        if (postReply.succeeded()) {
          entity.setId(postReply.result());
          tx.entity = entity;
          future.complete(tx);
        } else {
          pgClient.rollbackTx(tx.sqlConnection, reply -> future.fail(postReply.cause()));
        }
      });
    } else {
      future.complete(tx);
    }
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection, reply -> future.fail(e));
  }
  return future;
}

private Future<PgTransaction<Pet>> endTx(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  pgClient.endTx(tx.sqlConnection, v -> {
    future.complete(tx);
  });
  return future;
}

In this way the code can be broken into separate methods, sequence of execution is easy to follow and it is easier to spot any problems. The sample project with this and other examples can be found on github - mod-sample-composable-future

RxJava2

RxJava allows to compose flows and sequences of asynchronous data. More information on how to use rx-java2 with vertx can be found by the link. Provided that new PostgresClient is created the refactored example might look something like the following:

@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, SingleObserver<Response> observer, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      Pet entity = new Pet();
      entity.setId(id);
      PgTransaction<Pet> pgTransaction = new PgTransaction<>(entity);
      startTx(pgTransaction)
        .flatMap(this::findPet)
        .flatMap(this::vacateShelterPlace)
        .flatMap(this::adoptPet)
        .flatMap(this::endTx)
        .flatMap(this::constructAdoptResponse)
        .subscribe(observer);
    });
  } catch (Exception e) {
    observer.onSuccess(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()));
  }
}

private Single<PgTransaction<Pet>> startTx(PgTransaction<Pet> tx) {
  try {
    tx.sqlConnection = pgClient.startTx().blockingGet();
    return Single.just(tx);
  } catch (Exception e) {
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> findPet(PgTransaction<Pet> tx) {
  try {
    Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
    Results<Pet> results = pgClient.get(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false).blockingGet();
    if (results.getResults().isEmpty()) {
      tx.entity = null;
    } else {
      List<Pet> pets = results.getResults();
      tx.entity.setGenus(pets.get(0).getGenus());
      tx.entity.setQuantity(pets.get(0).getQuantity());
    }
    return Single.just(tx);
  } catch (Exception e) {
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> vacateShelterPlace(PgTransaction<Pet> tx) {
  try {
    if (tx.entity != null) {
      Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
      UpdateResult result = pgClient.delete(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, new Criterion(idCrit)).blockingGet();
      if (result.getUpdated() == 0) {
        tx.entity = null;
      }
    }
    return Single.just(tx);
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection);
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> adoptPet(PgTransaction<Pet> tx) {
  try {
    if (tx.entity != null) {
      Pet entity = new Pet();
      entity.setGenus(tx.entity.getGenus());
      entity.setQuantity(tx.entity.getQuantity());
      Results<Pet> results = pgClient.save(tx.sqlConnection, ADOPTED_PETS_TABLE_NAME, entity).blockingGet();
      if (!results.getResults().isEmpty()) {
        entity.setId(results.getResults().get(0).getId());
        tx.entity = entity;
        return Single.just(tx);
      } else {
        pgClient.rollbackTx(tx.sqlConnection);
        return Single.error(new RuntimeException("Couldn't adopt pet"));
      }
    }
    return Single.just(tx);
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection);
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> endTx(PgTransaction<Pet> tx) {
  pgClient.endTx(tx.sqlConnection);
  return Single.just(tx);
}

private Single<Response> constructAdoptResponse(PgTransaction<Pet> tx) {
  if (tx.entity != null) {
    return Single.just(PostPetsAdoptByIdResponse.respond201WithApplicationJson(tx.entity));
  }
  return Single.just(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase()));
}


The approach is similar - decomposition on separate methods in order to improve code readability. The sample module can be found on github - mod-sample-rx.

Conclusion

Both approaches allow to decompose the code and make it more readable. However, using rx-java would require significant changes in raml-module-builder. Therefore, using of composed futures might be the "go-to" solution for the time being while rx-java might be further investigated.

sequential_composition

vert_x_api_for_rxjava2

accessing-data-reactive-way

vertx-reactive-extensions