PubSub deprecation plan

Overview

mod-pubsub deprecation will be implemented in two phases:

  1. Transport. mod-pubsub will be replaced with pure Kafka as a transport. This should be implemented in all affected modules at the same time and become a part of a single FOLIO release to ensure seamless transition from one transport to another.
  2. Atomicity. Additionally to changing transport, a pattern like "Transactional outbox/inbox" can be implemented to ensure atomicity of two operations - writing into the DB and sending a message to Kafka.

Skipping Phase 2 DOES NOT mean creating a regression because currently we don't have anything to ensure atomicity of DB writes and PubSub publishing operations. However, we believe that the majority of issues with the PubSub event delivery is being caused by the PubSub itself. This means that completing Phase 1 should greatly improve the reliability of "Automated patron blocks". Phase 2 will improve it even further, but it requires more effort. For example, some message-sending functionality will need to be moved from modules that don't have a DB (like mod-circulation) to modules that do (like mod-circulation-storage).

Affected modules and event types

mod-circulation-storage

Publications

Subscriptions

LOG_RECORD


mod-circulation

Publications

Subscriptions

ITEM_CHECKED_OUT
ITEM_CHECKED_IN
ITEM_DECLARED_LOST
ITEM_AGED_TO_LOST
ITEM_CLAIMED_RETURNED
LOAN_DUE_DATE_CHANGED
LOAN_CLOSED
LOG_RECORD

LOAN_RELATED_FEE_FINE_CLOSED
FEE_FINE_BALANCE_CHANGED

mod-patron-blocks

Publications

Subscriptions

? ITEM_CHECKED_OUT
? ITEM_CHECKED_IN
? ITEM_DECLARED_LOST
? LOAN_DUE_DATE_CHANGED
? FEE_FINE_BALANCE_CHANGED

FEE_FINE_BALANCE_CHANGED
ITEM_CHECKED_OUT
ITEM_CHECKED_IN
ITEM_DECLARED_LOST
ITEM_AGED_TO_LOST
ITEM_CLAIMED_RETURNED
LOAN_DUE_DATE_CHANGED
LOAN_CLOSED

mod-feesfines

Publications

Subscriptions

FEE_FINE_BALANCE_CHANGED
LOAN_RELATED_FEE_FINE_CLOSED
LOG_RECORD


mod-remote-storage

Publications

Subscriptions

LOG_RECORD

? LOG_RECORD

mod-audit

Publications

Subscriptions

? LOG_RECORD

LOG_RECORD


"?" means that, most likely, it was added due to a PubSub (fixed) bug that required a module to subscribe to the same event types that it produces. Because of that, developers used to make publications and subscription lists identical.

Phase 1 implementation plan

See the feature UXPROD-4328 - Getting issue details... STATUS

For each of the affected modules:

  • Add Kafka default setting to the ModuleDescriptor.
  • Create Kafka topics using KafkaAdminClientService (refer to mod-inventory-storage). Most likely, this should be a part of the _tenant API.
  • Create services that would allow the module to act as a Kafka producer or consumer (or both, whichever is required). Make sure that they are testable, this would make it easy to rewrite tests that depend on the PubSub event matching.
  • Remove all PubSub dependencies (mod-pubsub-client).
  • Remove the MessagingDescriptor file.
  • Remove endpoints that handle PubSub events from the ModuleDescriptor (that's a breaking change!).
  • Remove all the code that registers or unregisters a module as a publisher or a subscriber of PubSub event types. Most likely, this code will be a part of the _tenant API. Replace it with the code that creates Kafka topics using KafkaAdminClientService. 
  • Replace the code that publishes PubSub events with Kafka message producing.
  • Replace the code that consumes PubSub events with Kafka message consuming.

Phase 2 implementation plan

See the feature UXPROD-4381 - Getting issue details... STATUS

The effort required to implement Transactional Outbox pattern varies depending on the module and the event type. They can be split into 3 categories based on the level of complexity:

  1. Modules that publish events and have their own database (e.g. mod-feesfines). Publishing stays in the same module, but Transactional Outbox pattern needs to be implemented. 
  2. Modules that publish events and don't have their own database (e.g. mod-circulation). Publishing of these events has to be moved to corresponding storage modules in order to have an ability to implement Transactional Outbox pattern.
  3. Log events - these modules can fall into either category 1 or 2, but it should be discussed separately whether they should also implement Transactional Outbox. The only functionality that can suffer from these messages not being delivered is Circulation Log.

Event types that need to be published in mod-circulation-storage instead of mod-circulation:
ITEM_CHECKED_OUT
ITEM_CHECKED_IN
ITEM_DECLARED_LOST
ITEM_AGED_TO_LOST
ITEM_CLAIMED_RETURNED
LOAN_DUE_DATE_CHANGED
LOAN_CLOSED

Event types that need to be published from mod-feesfines (the same module that is publishing them now to PubSub):
FEE_FINE_BALANCE_CHANGED
LOAN_RELATED_FEE_FINE_CLOSED

Module name

Has a DB

mod-circulation-storage

Y

mod-circulationN
mod-patron-blocksY
mod-feesfinesY
mod-remote-storageY
mod-auditY

Pattern implementation:

  • Create table message_outbox with fields: id, kafka_topic, payload, status
  • Every process that wants to publish messages should create a new entry in the message_outbox table with status NEW. It should be part of the same transaction as the change that the message is related to (e.g. loan update during check-out).
  • Create a timed process that will check for the new entries in the message_outbox table and publish messages if needed. 

Note. Additional complication comes from the fact that Transactional Outbox pattern doesn't guarantee that the message will be sent exactly once (could be more than once). This means that we need to introduce some unique identifier and check it on the consumer side.

Technical details and examples

folio-kafka-wrapper will be used for all Kafka interactions.

Example of Kafka default settings in the ModuleDescriptor:      

{ "name": "DB_MAXPOOLSIZE", "value": "5" },
{ "name": "KAFKA_HOST", "value": "kafka" },
{ "name": "KAFKA_PORT", "value": "9092" },
{ "name": "REPLICATION_FACTOR", "value": "1" },
{ "name": "ENV", "value": "folio" }

Example of defining topics and specifying the number of partitions:

public enum CirculationStorageKafkaTopic implements KafkaTopic {
  REQUEST("request", 10),
  LOAN("loan", 10),
  CHECK_IN("check-in", 10);
…
  @Override
  public String moduleName() {
    return "circulation";
  }
…
  @Override
  public int numPartitions() {
    return partitions;
  }
}

Example of topics creation:

.compose(r -> new KafkaAdminClientService(vertxContext.owner())
.createKafkaTopics(CirculationStorageKafkaTopic.values(), tenantId))