Reference JIRA issue: - ARCH-14Getting issue details... STATUS
The problem
In current implementation all Data Import tasks/messages are queued up in Kafka topics and processed by modules with maximum possible speed. It allows to fully utilize resources of the system.
On the other hand, all other processes suffer from resource starvation during imports and overall system latency increases. Also there's a need for manual operations, which utilize DI mechanisms. Those manual operations have to wait for all previous DI jobs to complete before they can be processed.
Proposed solution
To properly prioritize manual operations over long-running import jobs following logic should be implemented
Jobs priority
All DI jobs must have an corresponding priority - highest for manual operations and lowest for data importing jobs with "large" amount of records.
Prioritization should be done in mod-source-record-manager as at current entry point for all import processes.
Number of priorities at current stage: 2 (high/low)
All incoming jobs data (via REST or from Kafka) has to be put to a prioritized queue and picked from it for processing.
Flow control
Jobs priority alone will not solve the issue since underlying kafka streams can be filled up with messages. So the flow control mechanism needs to be implemented.
The implementation is done in mod-source-record-manager:
- Introduce two configuration parameters (names are to be altered):
- di.flow.max_simultaneous_records – this parameter defines how many records can be processed by the system simultaneously. Default value is proposed to be set to 100
- di.flow.records_threshold – this parameter defines how many records from previous batch must be processed before throwing new records to pipeline. Default value is proposed to be 50
- Implement flow control mechanism based on parameters above:
- mod-srm counts total number of messages it passed to DI pipeline. This number of records should not exceed di.flow.max_simultaneous_records at any time.
- when <<DI_COMPLETE>> or <<DI_ERROR>> is received, record counter is decreased
- when counter reaches di.flow.records_threshold or less, more records are pushed to pipeline (from priority queue). Count of records pushed should respect di.flow.max_simultaneous_records
- In current implementation actions from manual op