Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To solve the described challenge, the Transactional Outbox approach is proposed. With this pattern, the module providing a domain event will obtain a guarantee that the domain event has been recorded in the event queue before committing its transaction. This will ensure data integrity and properly transfers the responsibility for data integrity to the event queue.

Additional links:

...

2. Create DAO for CRUD operation with outbox_event_log table - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/audit/AuditOutboxEventsLogRepository.java

3. Create a new table internal_lock to be used as lock (. This is necessary to avoid (minimize) the possibility of sending duplicates to Kafka and to preserve order to allow execution only from single instance, locking just of particular rows in outbox_event_log does not guarantee order, so we need this separate table internal_lock for locking)the original order of events.

Code Block
CREATE TABLE IF NOT EXISTS internal_lock (
  lock_name text NOT NULL PRIMARY KEY
);
INSERT INTO internal_lock(lock_name) VALUES ('audit_outbox') ON CONFLICT DO NOTHING;

4. Create a repository to work with the internal_lock table. This table uses SELECT FOR UPDATE construction for locking - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/InternalLockRepository.java

5. Leverage folio-kafka-wrapper and create Kafka producer to send for sending audit events - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditEventProducer.java

6. Create AuditOutboxService to fetch data from outbox_event_log and send events to Kafka - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditOutboxService.java#L56. This implementation does the following:

  • starts new transaction,
  • acquires DB lock using internal_lock table,
  • grabs outbox objects,
  • constructs Kafka events from these objects and send sends them to Kafka,
  • deletes processed outbox events in batch by event ids,
  • commits transaction,
  • releases DB lock.

7. Use RMB .withTrans(..) implementation that automatically commits or rollbacks transaction if future was failed fails - https://github.com/folio-org/raml-module-builder/blob/b35.0/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java#L3754

...

  • starts new transaction,
  • creates or edits business entity,
  • saves audit outbox event with full entity snapshot,
  • commits transaction,
  • subscribe subscribes to .onCompleted of .withTrans(..) and after transaction finishes, invokes processing of audit outbox logs, see #6 step.

Examples of usages:
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/PurchaseOrdersAPI.java#L80 
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/services/lines/PoLinesService.java#L107

8. Add a timer interface to invoke each 30 mins and process existed that will be called every 30 minutes and handle existing outbox audit events. Timer interface was added Adding the timer interface is needed to be on the safe side during failures, for example such as when a transaction is committed , but the module was destroyed during the call to .onComplete(..) invocation and there is no any other edits events from the user to pick up receive all remaining audit events after the module restartis restarted. In this case, the timer invocation call will process the remaining events that have not sent yet eventsbeen handled. This timer interface invokes calls the AuditOutboxService, see step #6 step.

https://github.com/folio-org/mod-orders-storage/blob/master/descriptors/ModuleDescriptor-template.json#L544
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java

...