Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The goal of investigation was to find a way to chain async operations avoiding the so-called "pyramid of doom" or "callback hell" caused by nesting the callbacks. In the most innocent version of two nested callbacks the code can look Let's consider a case in which we would like to adopt a specific pet, the flow would be something like the following:

...

find pet by id → vacate shelter place (delete pet by id in the shelter) → take the pet home (save the pet among adopted)

We also want to make sure that in case something goes wrong on the stage of adoption our pet does not end up on the street, so we want to secure it's place back in the shelter. Therefore, it is important to execute this operation in a transaction.

Applying the traditional approach of nesting the callbacks the implementation of the mentioned case would look like that:

Code Block
@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      pgClient.startTx(connection -> {
        try {
          Criteria idCrit = constructCriteria("'id'", id);
          pgClient.get(connection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false, getReply -> {
            if (getReply.failed()) {
              asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
            } else if (getReply.result().getResults().isEmpty()) {
              asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
            } else {
              List<Pet> pets = getReply.result().getResults();
              Pet petForAdoption = new Pet();
              petForAdoption.setGenus(pets.get(0).getGenus());
              petForAdoption.setQuantity(pets.get(0).getQuantity());
              try {
                pgClient.delete(connection, HOMELESS_PETS_TABLE_NAME, new Criterion(idCrit), deleteReply -> {
                  if (deleteReply.failed()) {
                    pgClient.rollbackTx(connection, rollback -> {
                      asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                    });
                  } else if (deleteReply.result().getUpdated() == 0) {
                    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
                  } else {
                    try {
                      pgClient.save(connection, ADOPTED_PETS_TABLE_NAME, petForAdoption, postReply -> {
                        if (postReply.failed()) {
                          pgClient.rollbackTx(connection, rollback -> {
                            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                          });
                        } else {
                          petForAdoption.setId(postReply.result());
                          pgClient.endTx(connection, done -> {
                            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond201WithApplicationJson(petForAdoption)));
                          });
                        }
                      });
                    } catch (Exception e) {
                      pgClient.rollbackTx(connection, rollback -> {
                        asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
                      });
                    }
                  }
                });
              } catch (Exception e) {
                asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
              }
            }
          });
        } catch (Exception e) {
          asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
        }
      });
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

Even with excluded logging the code is hard to read and follow. Such code with growing complexity would result in an incomprehensive sequence of nested callbacks often called be the developers a 'pyramid of doom' or 'callback hell'. There are two approaches that can solve this issue - the usage of composed futures or rx-java.

Chained futures

Vertx futures support sequential composition allowing to chain async operations. More info on the subject can be found by the link. For example, the code above can be refactored as following:

Code Block
@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      Pet entity = new Pet();
      entity.setId(id);
      PgTransaction<Pet> pgTransaction = new PgTransaction<>(entity);
      Future.succeededFuture(pgTransaction)
        .compose(this::startTx)
        .compose(this::findPet)
        .compose(this::vacateShelterPlace)
        .compose(this::adoptPet)
        .compose(this::endTx)
        .setHandler(res -> {
          if (res.failed()) {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
          } else if (res.result().entity == null) {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond404WithTextPlain(Response.Status.NOT_FOUND.getReasonPhrase())));
          } else {
            asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond201WithApplicationJson(res.result().entity)));
          }
        });
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

private Future<PgTransaction<Pet>> startTx(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  pgClient.startTx(sqlConnection -> {
    tx.sqlConnection = sqlConnection;
    future.complete(tx);
  });
  return future;
}

private Future<PgTransaction<Pet>> findPet(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
    pgClient.get(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false, reply -> {
      if (reply.failed()) {
        future.fail(reply.cause());
      } else if (reply.result().getResults().isEmpty()) {
        tx.entity = null;
        future.complete(tx);
      } else {
        List<Pet> pets = reply.result().getResults();
        tx.entity.setGenus(pets.get(0).getGenus());
        tx.entity.setQuantity(pets.get(0).getQuantity());
        future.complete(tx);
      }
    });
  } catch (Exception e) {
    future.fail(e);
  }
  return future;
}

private Future<PgTransaction<Pet>> vacateShelterPlace(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    if (tx.entity != null) {
      Criteria idCrit = constructCriteria("'id'", tx.entity.getId());
      pgClient.delete(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, new Criterion(idCrit), reply -> {
        if (reply.succeeded()) {
          future.complete(tx);
        } else {
          pgClient.rollbackTx(tx.sqlConnection, res -> future.fail(reply.cause()));
        }
      });
    } else {
      future.complete(tx);
    }
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection, reply -> future.fail(e));
  }
  return future;
}

private Future<PgTransaction<Pet>> adoptPet(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  try {
    if (tx.entity != null) {
      Pet entity = new Pet();
      entity.setGenus(tx.entity.getGenus());
      entity.setQuantity(tx.entity.getQuantity());
      pgClient.get(save(tx.sqlConnection, ADOPTED_PETS_TABLE_NAME, Pet.class, fieldList, cql, true, false, reply ->entity, postReply -> {
        if (postReply.succeeded()) {
            if (reply.succeededentity.setId(postReply.result());
  {        tx.entity = entity;
      List<Pet>  petsList = replyfuture.result().getResults(complete(tx);
        } else {
          int totalRecords = petsList.size(pgClient.rollbackTx(tx.sqlConnection, reply -> future.fail(postReply.cause()));
         }
    PetsCollection petsCollection = new PetsCollection( });
    } else {
       petsCollectionfuture.setPetscomplete(petsListtx);
    }
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection, reply -> petsCollectionfuture.setTotalRecordsfail(totalRecordse));
  }
  return future;
}

private Future<PgTransaction<Pet>>    asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond200WithApplicationJson(petsCollection)));
endTx(PgTransaction<Pet> tx) {
  Future<PgTransaction<Pet>> future = Future.future();
  pgClient.endTx(tx.sqlConnection, v  }-> else {
    future.complete(tx);
  });
  return    asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
            }
          });
        } catch (Exception efuture;
}

In this way the code can be broken into separate methods, sequence of execution is easy to follow and it is easier to spot any problems. The sample project with this and other examples can be found on github - mod-sample-composable-future

RxJava2

RxJava allows to compose flows and sequences of asynchronous data. More information on how to use rx-java2 with vertx can be found by the link. Provided that new PostgresClient is created the refactored example might look something like the following:

Code Block
@Override
public void postPetsAdoptById(String id, Map<String, String> okapiHeaders, SingleObserver<Response> observer, Context vertxContext) {
  try {
    vertxContext.runOnContext(v  asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));-> {
      Pet entity }= new Pet();
    } catch (Exception e) {entity.setId(id);
      PgTransaction<Pet> pgTransaction = new asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()))PgTransaction<>(entity);
      }startTx(pgTransaction)
     });   } catch (Exception e) {.flatMap(this::findPet)
        asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

It might look OK so long as there are only two levels of nesting, but once the complexity of functionality grows, the flow will become unclear and hard to read. There are two approaches that can solve this issue - the usage of composed futures or rx-java.

Chained futures

Vertx futures support sequential composition allowing to chain async operations. More info on the subject can be found by the link. For example, the code above can be refactored as following:

Code Block
@Override
public void getPets(String query, int offset, int limit, String lang, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContextflatMap(this::vacateShelterPlace)
        .flatMap(this::adoptPet)
        .flatMap(this::endTx)
        .flatMap(this::constructAdoptResponse)
        .subscribe(observer);
    });
  } catch (Exception e) {
    observer.onSuccess(PostPetsAdoptByIdResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()));
  }
}

private Single<PgTransaction<Pet>> startTx(PgTransaction<Pet> tx) {
  try {
    vertxContexttx.runOnContext(v -> {
      PgQuery.PgQueryBuilder queryBuilder = new PgQuery.PgQueryBuilder(ALL_FIELDS, PETS_TABLE_NAME).query(query).offset(offset).limit(limitsqlConnection = pgClient.startTx().blockingGet();
    return Single.just(tx);
  } catch (Exception  Future.succeededFuture(queryBuilder)
        .compose(this::runGetQuery)e) {
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> findPet(PgTransaction<Pet> tx) {
  try {
    Criteria idCrit = constructCriteria("'id'", tx.compose(this::parseGetResults)entity.getId());
    Results<Pet> results =  pgClient.setHandler(res -> {
     get(tx.sqlConnection, HOMELESS_PETS_TABLE_NAME, Pet.class, new Criterion(idCrit), true, false).blockingGet();
    if (results.getResults(res).succeededisEmpty()) {
      tx.entity      asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond200WithApplicationJson(res.result())))= null;
    } else {
   } else { List<Pet> pets   = results.getResults();
       asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())))tx.entity.setGenus(pets.get(0).getGenus());
      tx.entity.setQuantity(pets.get(0).getQuantity());
    }
        });
    }return Single.just(tx);
  } catch (Exception e) {
    return asyncResultHandlerSingle.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()))error(e);
  }
}

private Future<Results<Pet>>Single<PgTransaction<Pet>> runGetQueryvacateShelterPlace(PgQuery.PgQueryBuilder queryBuilder) {
  Future<Results<Pet>> future = Future.future();PgTransaction<Pet> tx) {
  try {
    PgQuery query = queryBuilder.build();if (tx.entity != null) {
       pgClient.get(query.getTable(), Pet.class, query.getFields(), query.getCql(), true, false, future.completerCriteria idCrit = constructCriteria("'id'", tx.entity.getId());
  } catch (Exception e) {UpdateResult result = pgClient.delete(tx.sqlConnection,  future.fail(eHOMELESS_PETS_TABLE_NAME, new Criterion(idCrit)).blockingGet();
  }     return future;
}

private Future<PetsCollection> parseGetResults(Results<Pet> resultSet) {
  List<Pet> petsList = resultSet.getResults();
  int totalRecords = petsList.size();
  PetsCollection petsCollection = new PetsCollection();
  petsCollection.setPets(petsList);
  petsCollection.setTotalRecords(totalRecords);
  return Future.succeededFuture(petsCollection);
}

In this way the code can be decomposed on the actual querying the db and parsing the results, the handler could also be defined in a separate method if it contained some additional logic.

The sample project can be found on github - mod-sample-composable-future

RxJava2

RxJava allows to compose flows and sequences of asynchronous data. More information on how to use rx-java2 with vertx can be found by the link. The refactored example would look something like the following:

Code Block
@Override
public void getPets(String query, int offset, int limit, String lang, Map<String, String> okapiHeaders, SingleObserver<Response> observer, Context vertxContext) {
  try {
    PgQuery.PgQueryBuilder queryBuilder = new PgQuery.PgQueryBuilder(ALL_FIELDS, PETS_TABLE_NAME).query(query).offset(offset).limit(limit);
    runGetQuery(queryBuilder)
      .flatMap(this::constructGetResponse)
      .subscribe(observer);
  } catch (Exception e) {
    observer.onSuccess(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()));
  }
}

private Single<Results<Pet>> runGetQuery(PgQuery.PgQueryBuilder queryBuilder) {
  try {
    PgQuery query = queryBuilder.build();
    return pgClient.get(query.getTable(), Pet.class, query.getFields(), query.getCql(), true, false);
  } catch (Exception e) {if (result.getUpdated() == 0) {
        tx.entity = null;
      }
    }
    return Single.just(tx);
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection);
    return Single.error(e);
  }
}

private Single<PgTransaction<Pet>> adoptPet(PgTransaction<Pet> tx) {
  try {
    if (tx.entity != null) {
      Pet entity = new Pet();
      entity.setGenus(tx.entity.getGenus());
      entity.setQuantity(tx.entity.getQuantity());
      Results<Pet> results = pgClient.save(tx.sqlConnection, ADOPTED_PETS_TABLE_NAME, entity).blockingGet();
      if (!results.getResults().isEmpty()) {
        entity.setId(results.getResults().get(0).getId());
        tx.entity = entity;
        return Single.just(tx);
      } else {
        pgClient.rollbackTx(tx.sqlConnection);
        return Single.justerror(new Results<>(RuntimeException("Couldn't adopt pet"));
      }
}   private Single<Response>}
constructGetResponse(Results<Pet> results) {  return if(resultsSingle.getResultsjust(tx) != null;
  } catch (Exception e) {
    pgClient.rollbackTx(tx.sqlConnection);
  List<Pet> petsList =return resultsSingle.getResultserror(e);
  }
}
int
totalRecordsprivate =Single<PgTransaction<Pet>> petsList.size();
endTx(PgTransaction<Pet> tx) {
   PetsCollection petsCollection = new PetsCollection(pgClient.endTx(tx.sqlConnection);
  return Single.just(tx);
}

private Single<Response> petsCollection.setPets(petsList);
constructAdoptResponse(PgTransaction<Pet> tx) {
  if petsCollection.setTotalRecords(totalRecords);(tx.entity != null) {
    return Single.just(GetPetsResponsePostPetsAdoptByIdResponse.respond200WithApplicationJsonrespond201WithApplicationJson(petsCollectiontx.entity));
  }
  return Single.just(GetPetsResponsePostPetsAdoptByIdResponse.respond500WithTextPlainrespond404WithTextPlain(Response.Status.INTERNALNOT_SERVER_ERRORFOUND.getReasonPhrase()));
}


The approach is basically the same similar - decomposition on separate methods in order to improve code readability. The sample module can be found on github - mod-sample-rx.

...

Both approaches allow to decompose the code and make it more readable. However, using rx-java would require significant changes in raml-module-builder. Therefore, using of composed futures might be the "go-to" solution for the time being while rx-java might be further investigated in case of discovering some additional benefits of using it.

sequential_composition

...