Context
Some FOLIO architectural proposals (in particular, FOLIO Cross-Application Data Sync Solution, Acquisition Event Log) are based on the Domain Event approach, in which, for any actions with data in one part of the platform, a corresponding event is generated and published. In general, this approach has been approved by the FOLIO Tech Council.
However, one of the challenges of this Domain Event approach is to ensure the atomicity of two actions that are different in nature - changing an object in the database and publishing an event about this in the notification channel.
To solve the described challenge, the Transactional Outbox approach is proposed.
Additional links:
- https://microservices.io/patterns/data/transactional-outbox.html
- https://medium.com/engineering-varo/event-driven-architecture-and-the-outbox-pattern-569e6fba7216
This document is an attempt to describe the vision for a concrete implementation of this approach on FOLIO using the Acquisition Event Log as an example.
Implementation Detail
- Add to mod-orders-storage the Timer Interface https://github.com/folio-org/okapi/blob/master/doc/guide.md#timer-interface. This is necessary so that OkApi calls the necessary logic in mod-orders-storage with a given frequency. Usage examples are in mod-users, mod-licenses, mod-data-export, mod-circulation-storage
- Implement a listener code in mod-orders-storage for such a Timer API call from OkApi
- Create a internal_lock table with one row to manage concurrent threads through the mechanism of locks
- When a call comes from OkApi, "do whatever need to do to process the outbox table" - one needs to execute select for update query on internal_lock. If failed, do nothing. If succeeded, continue with the main logic of reading events from the outbox_table and sending them to Kafka. Repeat until outbox_table has no more events. Upon completion - make a commit / rollback on internal_lock to release a lock
Below is the sequencing this steps in a form of diagram.
The above sequence implements the required transactional outbox functionality, but it has some disadvantages. In particular,
- it requires a regular call to the _timer API from OkApi (moreover, the faster the information must pass through the transactional outbox and get into Kafka, the more often there should be API calls, which means an increase in network load at the API level,
- when processing each call, the module performs at least 3 calls to the database (to create a lock, poll the outbox table and release the lock) which again means an increase in network load and on the database.
To mitigate the described shortcomings, the following approach is proposed: upon successful insertion of a new event into the outbox_table, perform an asynchronous call to the "do whatever need to do to process the outbox table" logic.
Key points to implement transactional outbox pattern
1. Create a new table to save outbox object
CREATE TABLE IF NOT EXISTS outbox_event_log ( event_id uuid NOT NULL PRIMARY KEY, event_date timestamp NOT NULL, entity_type text NOT NULL, action text NOT NULL, payload jsonb );
2. Create DAO for CRUD operation with outbox_event_log table - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/audit/AuditOutboxEventsLogRepository.java
3. Create a new table to be used as lock (to avoid sending duplicates to Kafka and preserve order to allow execution only from single instance, locking just of particular rows in outbox_event_log does not guarantee order, so we need this separate table internal_lock for locking)
CREATE TABLE IF NOT EXISTS internal_lock ( lock_name text NOT NULL PRIMARY KEY ); INSERT INTO internal_lock(lock_name) VALUES ('audit_outbox') ON CONFLICT DO NOTHING;
4. Create a repository to work with lock table. This table uses SELECT FOR UPDATE construction for locking - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/InternalLockRepository.java
5. Leverage folio-kafka-wrapper and create Kafka producer to send audit events - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditEventProducer.java
6. Create AuditOutboxService to fetch data from outbox_event_log and send events to Kafka - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditOutboxService.java#L56. This implementation does the following:
- starts new transaction,
- acquires DB lock using internal_lock table,
- grabs outbox objects,
- constructs Kafka events from these objects and send them to Kafka,
- deletes processed outbox events in batch by event ids,
- commits transaction,
- releases DB lock.
7. Use RMB .withTrans(..) implementation that automatically commits or rollbacks transaction if future was failed - https://github.com/folio-org/raml-module-builder/blob/b35.0/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java#L3754
All methods that do creating or editing of entities should add this step to save outbox event in the same DB transaction. This flow does the following:
- starts new transaction,
- creates or edits business entity,
- saves audit outbox event with full entity snapshot,
- commits transaction,
- subscribe to .onCompleted of .withTrans(..) and after transaction finishes, invokes processing of audit outbox logs, see #6 step
Examples of usages:
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/PurchaseOrdersAPI.java#L80
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/services/lines/PoLinesService.java#L107
8. Add a timer interface to invoke each 30 mins and process existed outbox audit events. Timer interface was added to be on the safe side during failures, for example when transaction committed, but module was destroyed
during .onComplete() invocation and there is no any other edits from user to pick up all remaining audit events after module restart. In this case timer invocation will process remaining not sent yet events. This timer interface invokes AuditOutboxService, see #6 step
https://github.com/folio-org/mod-orders-storage/blob/master/descriptors/ModuleDescriptor-template.json#L544
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java
Recommended Tests
How does Postgres handle a sudden loss of connection between an application and a database when using the select for update construct?
Assumption: if the application executed a select for update query and locked some row in the table, then in case of incorrect closing of the connection between this application and the database (i.e. without an explicit commit or rollback), the imposed lock will be released by the database engine itself.
Test: connect to a Postgres database in a test application, execute select for update on the outbox_table_blocker table to block it, break the connection without an explicit commit / rollback (for example, stop the instance, or terminate the execution of the thread, or break the connection using some kind of test proxy) and check if the row in outbox_table_blocker remains blocked or not, can another test application execute select for update on the same row?
What is the expected performance of the proposed implementation of the Transactional Outbox pattern?
Assumption: performance, measured by the number of events that can be transferred from the table to Kafka per unit of time, is acceptable.
Important: in fact, the acceptance criterion is quite vague. Are 10 events per second acceptable? 100? 1000 or more? Probably the expected performance depends on the usage scenario. Therefore, the task of the test is to measure the performance of the proposed implementation of the Transactional Outbox pattern and formulate it as a baseline metric.
Test: Prepare N events in outbox_table (for example, 100 thousand). Enable the main logic of reading events from the outbox_table and sending them to Kafka. Measure how much time it will take to process all 100K events.