Current approach for working with transactions in RMB
Work with the database on the FOLIO project is presented through a custom solution on top of the VERTX.X Postgres Client. The main feature of working with RMB and VERT.X is the usage of the asynchronous approach. Sequential execution of operations requires handling that each operation was finished and all errors were operated. If the first operation was succeeded then the subsequent operation can be executed. The development of business logic arises the need for transactionality and rollback of all operations in case of an error in one of the operations. This is necessary to maintain consistency of data. At the moment, this possibility is implemented as follows:
- *The database connection object is created and the SQL command “BEGIN” is executed
- The connection object is passed as a parameter to the Postgres client's methods and, accordingly, all commands are executed within a single connection
- Handle all errors and Futures succeed
- If an error occurs, you must explicitly call rollback
- At the end of a transaction, you must explicitly call the endTransaction() method
- *After the end of the transaction - execute the SQL command “COMMIT”
First and last operations RMB PostgresClient do automatically
Example method with two operation in scope of one transaction
public Future<Void> example() { Future future = Future.future(); PostgresClient client = PostgresClient.getInstance(vertx, tenantId); // start tx client.startTx(tx -> { // first operation client.get(tx, "upload_definition", UploadDefinition.class, new Criterion(), true, false, getHandler -> { if (getHandler.succeeded()) { // second operation client.save(tx, "upload_definition", UUID.randomUUID().toString(), getHandler.result(), saveHandler -> { if (saveHandler.succeeded()) { client.endTx(tx, endHandler -> { if (endHandler.succeeded()) { future.succeeded(); } else { client.rollbackTx(tx, rollbackHandler -> { future.fail(getHandler.cause()); }); } }); } else { client.rollbackTx(tx, rollbackHandler -> { future.fail(getHandler.cause()); }); } }); } else { client.rollbackTx(tx, rollbackHandler -> { future.fail(getHandler.cause()); }); } }); }); return future; }
Locking tables in a database
When developing a slightly more complex business logic, the difficulty arises in the fact that certain operations may take some time and, accordingly, at this moment there is a possibility that it will be necessary to process another such request. Without locking a record in the database, there is a high probability of “lost changes” when the second request will overwrite the changes made first. But since VERTX.X is asynchronous, any locks and synchronous code executions are unacceptable, and the Persistence Context is absent. The most obvious is to use locks on the record in the database using the “SELECT FOR UPDATE” statement. Accordingly, to perform a safe update of the record in the database, you must:
- Create Transaction Object
- Select data from database for update (using “SELECT FOR UPDATE”)
- Do some kind of business logic operation
- Update entity in database
- Complete transaction
Such kind of the scenario was needed at mod-data-import for file upload functionality.
Steps for file upload example:
- Load Upload Definition from database
- Check for FileDefinition
- Save file to the local storage
- Update Upload Definition status
- Save changed Upload Definition entity into the database
Upload Definition mutator interface for callback usage
/** * Functional interface for change UploadDefinition in blocking update statement */ @FunctionalInterface public interface UploadDefinitionMutator { /** * @param definition - Loaded from DB UploadDefinition * @return - changed Upload Definition ready for save into database */ Future<UploadDefinition> mutate(UploadDefinition definition); }
Method for update entity with record locking
public Future<UploadDefinition> updateBlocking(String uploadDefinitionId, UploadDefinitionMutator mutator) { Future<UploadDefinition> future = Future.future(); String rollbackMessage = "Rollback transaction. Error during upload definition update. uploadDefinitionId" + uploadDefinitionId; pgClient.startTx(tx -> { try { StringBuilder selectUploadDefinitionQuery = new StringBuilder("SELECT jsonb FROM ") .append(schema) .append(".") .append(UPLOAD_DEFINITION_TABLE) .append(" WHERE _id ='") .append(uploadDefinitionId).append("' LIMIT 1 FOR UPDATE;"); pgClient.execute(tx, selectUploadDefinitionQuery.toString(), selectResult -> { if (selectResult.failed() || selectResult.result().getUpdated() != 1) { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, selectResult.cause()); future.fail(new NotFoundException(rollbackMessage)); }); } else { Criteria idCrit = new Criteria(); idCrit.addField(UPLOAD_DEFINITION_ID_FIELD); idCrit.setOperation("="); idCrit.setValue(uploadDefinitionId); pgClient.get(tx, UPLOAD_DEFINITION_TABLE, UploadDefinition.class, new Criterion(idCrit), false, true, uploadDefResult -> { if (uploadDefResult.failed() || uploadDefResult.result() == null || uploadDefResult.result().getResultInfo() == null || uploadDefResult.result().getResultInfo().getTotalRecords() < 1) { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage); future.fail(new NotFoundException(rollbackMessage)); }); } else { try { UploadDefinition definition = uploadDefResult.result().getResults().get(0); mutator.mutate(definition) .setHandler(onMutate -> { if (onMutate.succeeded()) { try { CQLWrapper filter = new CQLWrapper(new CQL2PgJSON(UPLOAD_DEFINITION_TABLE + ".jsonb"), "id==" + definition.getId()); pgClient.update(tx, UPLOAD_DEFINITION_TABLE, onMutate.result(), filter, true, updateHandler -> { if (updateHandler.succeeded() && updateHandler.result().getUpdated() == 1) { pgClient.endTx(tx, endTx -> { if (endTx.succeeded()) { future.complete(definition); } else { logger.error(rollbackMessage); future.fail("Error during updating UploadDefinition with id: " + uploadDefinitionId); } }); } else { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, updateHandler.cause()); future.fail(updateHandler.cause()); }); } }); } catch (Exception e) { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, e); future.fail(e); }); } } else { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, onMutate.cause()); future.fail(onMutate.cause()); }); } }); } catch (Exception e) { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, e); future.fail(e); }); } } }); } }); } catch (Exception e) { pgClient.rollbackTx(tx, r -> { logger.error(rollbackMessage, e); future.fail(e); }); } }); return future; }
Method with file upload logic
@Override public Future<UploadDefinition> uploadFile(String fileId, String uploadDefinitionId, InputStream data, OkapiConnectionParams params) { return uploadDefinitionService.updateBlocking(uploadDefinitionId, uploadDefinition -> { Future<UploadDefinition> future = Future.future(); Optional<FileDefinition> optionalFileDefinition = uploadDefinition.getFileDefinitions().stream().filter(fileFilter -> fileFilter.getId().equals(fileId)) .findFirst(); if (optionalFileDefinition.isPresent()) { FileDefinition fileDefinition = optionalFileDefinition.get(); FileStorageServiceBuilder .build(vertx, tenantId, params) .map(service -> service.saveFile(data, fileDefinition, params) .setHandler(onFileSave -> { if (onFileSave.succeeded()) { uploadDefinition.setFileDefinitions(replaceFile(uploadDefinition.getFileDefinitions(), onFileSave.result())); uploadDefinition.setStatus(uploadDefinition.getFileDefinitions().stream().allMatch(FileDefinition::getLoaded) ? UploadDefinition.Status.LOADED : UploadDefinition.Status.IN_PROGRESS); future.complete(uploadDefinition); } else { future.fail("Error during file save"); } })); } else { future.fail("FileDefinition not found. FileDefinition ID: " + fileId); } return future; }); }
Problems of the current approach
When using the asynchronous approach when working with a database, there are a number of limitations and difficulties in writing sequential or transactional business logic.
- The inability to simultaneously run multiple operations within a single connection
- A transaction is used within one connection and, accordingly, all subsequent actions must be performed in handlers and manually processed all errors. Callback hell as a result
- There is no support for some kind of persistence context and all locks must be done manually in the database by separate requests
- There is no possibility to easily manage the transaction isolation level at the application level
- Since all objects are stored in JSON format in the database, there is no way to be sure that the data is stored correctly. At the moment there is an opportunity to save any JSON in any table
- Because of the storage of objects in JSON format, it is not possible to build durable relationships between entities
- It is necessary either to store all the links in one big JSON object in one table or put them into the other tables and create primary keys that do not guarantee contact with another entity. Need to make several requests to load data by primary keys and insert them into the parent object
- RMB don't have all needed methods for working with transactions and queries.
- RMB don't have method for get() method using custom SQL script with transaction support
Possible solutions
- Refactoring of the RMB's PostgresClient and add a couple of new methods for updating with blocking, transactions and loading records with the blocking of the record itself in the scope of the transaction
- Investigate the possibility of using other database tools. Conduct a study on the compatibility of the vert.x framework and existing solutions for the database, perhaps even synchronous solutions.