Skip to end of banner
Go to start of banner

Implementing the Transactional Outbox pattern

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Context

Some FOLIO architectural proposals (in particular, FOLIO Cross-Application Data Sync Solution, Acquisition Event Log) are based on the Domain Event approach, in which, for any actions with data in one part of the platform, a corresponding event is generated and published. In general, this approach has been approved by the FOLIO Tech Council.

However, one of the challenges of this Domain Event approach is to ensure the atomicity of two actions that are different in nature - changing an object in the database and publishing an event about this in the notification channel.

To solve the described challenge, the Transactional Outbox approach is proposed.

Additional links:

This document is an attempt to describe the vision for a concrete implementation of this approach on FOLIO using the Acquisition Event Log as an example.

Implementation Detail

  • Add to mod-orders-storage the Timer Interface https://github.com/folio-org/okapi/blob/master/doc/guide.md#timer-interface. This is necessary so that OkApi calls the necessary logic in mod-orders-storage with a given frequency. Usage examples are in mod-users, mod-licenses, mod-data-export, mod-circulation-storage
  • Implement a listener code in mod-orders-storage for such a Timer API call from OkApi
  • Create a internal_lock table with one row to manage concurrent threads through the mechanism of locks
  • When a call comes from OkApi, "do whatever need to do to process the outbox table" - one needs to execute select for update query on internal_lock. If failed, do nothing. If succeeded, continue with the main logic of reading events from the outbox_table and sending them to Kafka. Repeat until outbox_table has no more events. Upon completion - make a commit / rollback on internal_lock to release a lock

Below is the sequencing this steps in a form of diagram.

 Plantuml for the diagram above...

@startuml Implementing the Transactional Outbox - Timer API driven

title Implementing the Transactional Outbox pattern - Timer API driven

skinparam sequence {
    ParticipantBorderColor Black
    ParticipantBackgroundColor White
    ParticipantFontColor Black
}

participant "OkApi\n" as OkApi
participant "mod-orders-storage\nMessage Relay logic" as MOS
participant "Database\ntable for locks" as OTB
participant "Database\noutbox table" as OT
participant "Kafka\n" as Kafka

OkApi -> MOS ++ : call Timer API
MOS --> OkApi : 200 OK

group do whatever need to do to process the outbox table
MOS -> OTB ++ : try to acquire lock via select for update
alt
    OTB --x MOS : lock failed
end

return lock succeeded

loop while there are new events
    MOS -> OT ++ : query table
    return N new events
    MOS -> MOS : transform events to messages
    MOS -> Kafka ++ : send messages
    alt
        Kafka --x MOS : failed
        MOS -> MOS : exit loop
    end
    return succeeded
    MOS -> OT ++ : remove sent events from table
    return succeeded
end

MOS -> OTB ++ : release lock
return succeeded

end

@enduml

The above sequence implements the required transactional outbox functionality, but it has some disadvantages. In particular,

  • it requires a regular call to the _timer API from OkApi (moreover, the faster the information must pass through the transactional outbox and get into Kafka, the more often there should be API calls, which means an increase in network load at the API level,
  • when processing each call, the module performs at least 3 calls to the database (to create a lock, poll the outbox table and release the lock) which again means an increase in network load and on the database.

To mitigate the described shortcomings, the following approach is proposed: upon successful insertion of a new event into the outbox_table, perform an asynchronous call to the "do whatever need to do to process the outbox table" logic.

 Plantuml for the diagram above...

@startuml Implementing the Transactional Outbox pattern - Async call driven

title Implementing the Transactional Outbox pattern - Async call driven

skinparam sequence {
    ParticipantBorderColor Black
    ParticipantBackgroundColor White
    ParticipantFontColor Black
}

participant "mod-orders-storage\nmain logic" as mos
participant "Database\norders and outbox tables" as dbo
participant "mod-orders-storage\nMessage Relay logic" as mr

-> mos: make a change
group RDBMS transaction
    mos -> dbo : start trx
    activate dbo
    mos -> dbo : make a change\nin orders data
    mos -> dbo : save an event to Outbox table
    mos -> dbo: commit trx
    deactivate dbo
    alt error while saving an event
        mos -> dbo : rollback trx
    end
end
mos -> mr: processOutbox (async call)
<- mos: done
mr -> mr: "do whatever need to do\nto process the outbox table"

@enduml

Key points to implement transactional outbox pattern

1. Create a new table to save outbox object

CREATE TABLE IF NOT EXISTS outbox_event_log (
  event_id uuid NOT NULL PRIMARY KEY,
  event_date timestamp NOT NULL,
  entity_type text NOT NULL,
  action text NOT NULL,
  payload jsonb
);

2. Create DAO for CRUD operation with outbox_event_log table - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/audit/AuditOutboxEventsLogRepository.java

3. Create a new table to be used as lock (to avoid sending duplicates to Kafka and preserve order to allow execution only from single instance, locking just of particular rows in outbox_event_log does not guarantee order, so we need this separate table internal_lock for locking)

CREATE TABLE IF NOT EXISTS internal_lock (
  lock_name text NOT NULL PRIMARY KEY
);
INSERT INTO internal_lock(lock_name) VALUES ('audit_outbox') ON CONFLICT DO NOTHING;

4. Create a repository to work with lock table. This table uses SELECT FOR UPDATE construction for locking - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/dao/InternalLockRepository.java

5. Leverage folio-kafka-wrapper and create Kafka producer to send audit events - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditEventProducer.java

6. Create AuditOutboxService to fetch data from outbox_event_log and send events to Kafka - https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/event/service/AuditOutboxService.java#L56. This implementation does the following:

  • starts new transaction,
  • acquires DB lock using internal_lock table,
  • grabs outbox objects,
  • constructs Kafka events from these objects and send them to Kafka,
  • deletes processed outbox events in batch by event ids,
  • commits transaction,
  • releases DB lock.

7. Use RMB .withTrans(..) implementation that automatically commits or rollbacks transaction if future was failed - https://github.com/folio-org/raml-module-builder/blob/b35.0/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java#L3754

All methods that do creating or editing of entities should add this step to save outbox event in the same DB transaction. This flow does the following:

  • starts new transaction,
  • creates or edits business entity,
  • saves audit outbox event with full entity snapshot,
  • commits transaction,
  • subscribe to .onCompleted of .withTrans(..) and after transaction finishes, invokes processing of audit outbox logs, see #6 step

Examples of usages:
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/PurchaseOrdersAPI.java#L80 
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/services/lines/PoLinesService.java#L107

8. Add a timer interface to invoke each 30 mins and process existed outbox audit events. Timer interface was added to be on the safe side during failures, for example when transaction committed, but module was destroyed
during .onComplete() invocation and there is no any other edits from user to pick up all remaining audit events after module restart. In this case timer invocation will process remaining not sent yet events. This timer interface invokes AuditOutboxService, see #6 step

https://github.com/folio-org/mod-orders-storage/blob/master/descriptors/ModuleDescriptor-template.json#L544
https://github.com/folio-org/mod-orders-storage/blob/master/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java

Recommended Tests

How does Postgres handle a sudden loss of connection between an application and a database when using the select for update construct?

Assumption: if the application executed a select for update query and locked some row in the table, then in case of incorrect closing of the connection between this application and the database (i.e. without an explicit commit or rollback), the imposed lock will be released by the database engine itself.

Test: connect to a Postgres database in a test application, execute select for update on the outbox_table_blocker table to block it, break the connection without an explicit commit / rollback (for example, stop the instance, or terminate the execution of the thread, or break the connection using some kind of test proxy) and check if the row in outbox_table_blocker remains blocked or not, can another test application execute select for update on the same row?

What is the expected performance of the proposed implementation of the Transactional Outbox pattern?

Assumption: performance, measured by the number of events that can be transferred from the table to Kafka per unit of time, is acceptable.

Important: in fact, the acceptance criterion is quite vague. Are 10 events per second acceptable? 100? 1000 or more? Probably the expected performance depends on the usage scenario. Therefore, the task of the test is to measure the performance of the proposed implementation of the Transactional Outbox pattern and formulate it as a baseline metric.

Test: Prepare N events in outbox_table (for example, 100 thousand). Enable  the main logic of reading events from the outbox_table and sending them to Kafka. Measure how much time it will take to process all 100K events.


  • No labels