Skip to end of banner
Go to start of banner

Chained futures and rx-java overview

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

MODDATAIMP-19 - Getting issue details... STATUS

MODLOGIN-73 - Getting issue details... STATUS


The goal of investigation was to find a way to chain async operations avoiding the so-called "pyramid of doom" or "callback hell" caused by nesting the callbacks. In the most innocent version of two nested callbacks the code can look something like the following:

@Override
public void getPets(String query, int offset, int limit, String lang, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      try {
        String[] fieldList = {"*"};
        CQLWrapper cql = new CQLWrapper(new CQL2PgJSON(PETS_TABLE_NAME + ".jsonb"), query)
          .setLimit(new Limit(limit))
          .setOffset(new Offset(offset));
        try {
          pgClient.get(PETS_TABLE_NAME, Pet.class, fieldList, cql, true, false, reply -> {
            if (reply.succeeded()) {
              List<Pet> petsList = reply.result().getResults();
              int totalRecords = petsList.size();
              PetsCollection petsCollection = new PetsCollection();
              petsCollection.setPets(petsList);
              petsCollection.setTotalRecords(totalRecords);
              asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond200WithApplicationJson(petsCollection)));
            } else {
              asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
            }
          });
        } catch (Exception e) {
          asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
        }
      } catch (Exception e) {
        asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
      }
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

It might look OK so long as there are only two levels of nesting, but once the complexity of functionality grows, the flow will become unclear and hard to read. There are two approaches that can solve this issue - the usage of composed futures or rx-java.

Chained futures

Vertx futures support sequential composition allowing to chain async operations. More info on the subject can be found by the link. For example, the code above can be refactored as following:

@Override
public void getPets(String query, int offset, int limit, String lang, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
  try {
    vertxContext.runOnContext(v -> {
      PgQuery.PgQueryBuilder queryBuilder = new PgQuery.PgQueryBuilder(ALL_FIELDS, PETS_TABLE_NAME).query(query).offset(offset).limit(limit);
      Future.succeededFuture(queryBuilder)
        .compose(this::runGetQuery)
        .compose(this::parseGetResults)
        .setHandler(res -> {
          if (res.succeeded()) {
            asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond200WithApplicationJson(res.result())));
          } else {
            asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
          }
        });
    });
  } catch (Exception e) {
    asyncResultHandler.handle(Future.succeededFuture(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())));
  }
}

private Future<Results<Pet>> runGetQuery(PgQuery.PgQueryBuilder queryBuilder) {
  Future<Results<Pet>> future = Future.future();
  try {
    PgQuery query = queryBuilder.build();
    pgClient.get(query.getTable(), Pet.class, query.getFields(), query.getCql(), true, false, future.completer());
  } catch (Exception e) {
    future.fail(e);
  }
  return future;
}

private Future<PetsCollection> parseGetResults(Results<Pet> resultSet) {
  List<Pet> petsList = resultSet.getResults();
  int totalRecords = petsList.size();
  PetsCollection petsCollection = new PetsCollection();
  petsCollection.setPets(petsList);
  petsCollection.setTotalRecords(totalRecords);
  return Future.succeededFuture(petsCollection);
}

In this way the code can be decomposed on the actual querying the db and parsing the results, the handler could also be defined in a separate method if it contained some additional logic.

The sample project can be found on github - mod-sample-composable-future

RxJava2

RxJava allows to compose flows and sequences of asynchronous data. More information on how to use rx-java2 with vertx can be found by the link. The refactored example would look something like the following:

@Override
public void getPets(String query, int offset, int limit, String lang, Map<String, String> okapiHeaders, SingleObserver<Response> observer, Context vertxContext) {
  try {
    PgQuery.PgQueryBuilder queryBuilder = new PgQuery.PgQueryBuilder(ALL_FIELDS, PETS_TABLE_NAME).query(query).offset(offset).limit(limit);
    runGetQuery(queryBuilder)
      .flatMap(this::constructGetResponse)
      .subscribe(observer);
  } catch (Exception e) {
    observer.onSuccess(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()));
  }
}

private Single<Results<Pet>> runGetQuery(PgQuery.PgQueryBuilder queryBuilder) {
  try {
    PgQuery query = queryBuilder.build();
    return pgClient.get(query.getTable(), Pet.class, query.getFields(), query.getCql(), true, false);
  } catch (Exception e) {
    return Single.just(new Results<>());
  }
}

private Single<Response> constructGetResponse(Results<Pet> results) {
  if(results.getResults() != null) {
    List<Pet> petsList = results.getResults();
    int totalRecords = petsList.size();
    PetsCollection petsCollection = new PetsCollection();
    petsCollection.setPets(petsList);
    petsCollection.setTotalRecords(totalRecords);
    return Single.just(GetPetsResponse.respond200WithApplicationJson(petsCollection));
  }
  return Single.just(GetPetsResponse.respond500WithTextPlain(Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()));
}


The approach is basically the same - decomposition on separate methods in order to improve code readability. The sample module can be found on github - mod-sample-rx.

Conclusion

Both approaches allow to decompose the code and make it more readable. However, using rx-java would require significant changes in raml-module-builder. Therefore, using of composed futures might be the "go-to" solution while rx-java might be further investigated in case of discovering some additional benefits of using it.

sequential_composition

vert_x_api_for_rxjava2

accessing-data-reactive-way

vertx-reactive-extensions


  • No labels