Reindex Improvements

Arch ticket: https://folio-org.atlassian.net/browse/ARCH-273

Summary

Implementation of the classification browse feature required the introduction of a new search index. Adding a new index negatively impacted the reindexing procedure performance. For tenants with large datasets, the reindexing procedure exceeds the maintenance time window.

Especially, the impact is significant on ECS environment due to the need to aggregate data across multiple tenants' inventory storage. The event model of reindexing involves receiving "create/update" domain events by mod-search which have only identifiers of related instances and the module would fetch the full information on the entity through HTTP requests. This is the root cause of the reindexing procedure slowness. The proposed solution describes the approach to address the issue with database schema-to-schema communication instead of HTTP communication.

Requirements

Functional requirements

  1. There should be no impact on the current behavior of search capabilities.

  2. The event model for indexing newly created/updated documents should remain as-is

Non-functional requirements

  1. Performance

  2. ECS Support

Baseline Architecture

The baseline architecture is described here:

  1. Search indexing procedure architecture

  2. ECS indexing procedure

Drawbacks of existing solution:

  1. HTTP calls to inventory impact the latency of indexing of a single instance.

  2. Slow-running “upsert scripts” for partial updates in OpenSearch/Elasticsearch.

  3. The need to aggregate instances across multiple tenants in an ECS environment requires multiple updates for every instance

  4. Data duplication in the consortium_instance table might cause additional overhead in Postgres performance for big dataset

Solution Options

#

Option

Description

Pros

Cons

Decision

#

Option

Description

Pros

Cons

Decision

0

Existing architecture

The reindexing procedure is based on the domain event model.

 

  • Reaches the limits of a maintenance window

  • Change of search mapping requires full reindexing

  • Domain events during reindexing only contain instance id and require additional HTTP request for each instance to get the whole entity

 

1

Database-to-Database query

The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done in the db-to-db query

  • Stages can be started independently

  • Support both ECS and standalone mode

  • Performance improved

  • The “merge” stage requires data duplication in mod-search’s database

  • Dependency on internal database structure between mod-search and mod-inventory-storage

  • Requires cross-tenant and cross-schema queries

  • Cannot be applied in cases when modules' databases are located on the different database servers

Rejected due to introduction of dependencies

2

Reindexing through Kafka

The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done by sending all entities through the Kafka topic

  • Stages can be started independently

  • Support both ECS and standalone mode

  • Performance improved

  • The boundaries of modules and tenants are not affected

  • The “merge” stage requires data duplication in mod-search’s database

 

Target Architecture Option 1

Assumptions

  1. The solution is aimed at ECS and non-ECS environments using a schema-per-tenant database approach. For other cases the existing implementation of the reindexing procedure is available.

  2. Schema-to-schema communication is not a user-facing use case. And is only limited to the initial load as it is a background maintenance process.

Key aspects of the solution

The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:

  1. The whole reindexing process should be split into two steps.

    1. Aggregate all the information related to instances, holdings, and items in the mod-search database

    2. Bulk upload prepared documents from the mod-search database to Opensearch/Elasticsearch

  2. Split table consortium_instance to instances, holdings, and items tables to reduce data
    duplication

  3. Create a table for tracking chunk processing progress

  4. For initial inventory indexing or when the inventory data model changes, all entities from the inventory
    should be copied through INSERT INTO ... SELECT with cross-tenant (cross-schema) SQL queries. To allow parallel execution of queries it is proposed to split data transfer into chunks.

  5. Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1

  6. The existing event model should support the new data model in the mod-search database

Sequence Diagram

ERD

Solution Details

The database of mod-search in a single-tenant environment or ECS mode should contain the following tables that would work as aggregates for entities that will be posted to the search index.

create table instances ( instance_id varchar(255) not null, shared boolean not null default false, isBoundWith boolean not null default false, instance_json jsonb not null, primary key (instance_id) ); create table holdings ( holding_id varchar(255) not null, tenant_id varchar(255) not null, instance_id varchar(255) not null, holding_json jsonb not null, primary key (holding_id, tenant_id) ); create index holdings_instances_idx on holdings (instance_id); create table items ( item_id varchar(255) not null, tenant_id varchar(255) not null, holding_id varchar(255) not null, item_json jsonb not null, primary key (item_id, tenant_id) ); create index items_holdings_idx on items (holding_id); create table classifications ( classification_number varchar(255) not null, classification_type_id varchar(255) not null, instance_id varchar(255) not null, shared boolean not null default false, primary key (classification_number, classification_type_id, instance_id) ); create table subjects ( subject_id varchar(255) not null, subject_value varchar(255) not null, authority_id varchar(255) not null default '', instance_id varchar(255) not null, shared boolean not null default false, primary key (subject_id, subject_value, instance_id) ); create table contributors ( contributor_id varchar(255) not null, contributor_name varchar(255) not null, contributor_name_type_id varchar(255) not null, authority_id varchar(255) not null default '', instance_id varchar(255) not null, shared boolean not null default false, primary key (contributor_id, contributor_name, contributor_name_type_id, instance_id) ); -- create triggers for classifications create or replace function classification_trigger() returns trigger as $$ declare i jsonb; begin -- extract classifications for i in select * from jsonb_array_elements(new.instance_json -> 'classifications') loop insert into classifications(classification_number, classification_type_id, instance_id, shared) values (i ->> 'classificationNumber', i ->> 'classificationTypeId', new.instance_id, new.shared) on conflict (classification_type_id, classification_number, instance_id) do update set shared = excluded.shared; end loop; -- extract subjects for i in select * from jsonb_array_elements(new.instance_json -> 'subjects') loop insert into subjects(subject_id, subject_value, authority_id, instance_id, shared) values (encode(digest((i ->> 'value' || i ->> 'authorityId')::bytea, 'sha1'), 'hex'), -- requires pgcrypto enabled i ->> 'value', i ->> 'authorityId', new.instance_id, new.shared) on conflict (subject_id, subject_value, instance_id) do update set shared = excluded.shared; end loop; -- extract contributors for i in select * from jsonb_array_elements(new.instance_json -> 'contributors') loop insert into contributors(contributor_id, contributor_name, contributor_name_type_id, authority_id, instance_id, shared) values (encode(digest((i ->> 'name' || i ->> 'authorityId')::bytea, 'sha1'), 'hex'), -- requires pgcrypto enabled i ->> 'name', i ->> 'contributorNameTypeId', new.instance_id, new.shared) on conflict (contributor_id, contributor_name, contributor_name_type_id, instance_id) do update set shared = excluded.shared; end loop; return new; end ; $$ language plpgsql; create trigger instance_trigger after insert or update on instances for each row execute function classification_trigger(); -- aggregated view create or replace view instances_view as select h.tenant_id, i.instance_id, jsonb_build_object('instanceId', i.instance_id, 'instance', i.instance_json, 'holdingsRecords', h.holding_json, 'items', it.item_json, 'itBoundWith', i.isBoundWith) as json from instances i left join holdings h on i.instance_id = h.instance_id left join items it on h.holding_id = it.holding_id;

The sample of the initial data upload function from the database of mod-inventory-storage to the mod-search database is below:

create or replace function copy_data_from_inventory(range_to_process int) as $$ declare ranges table ( range_id int, lower uuid, upper uuid ); begin insert into ranges(range_id, lower, upper) values (1::int, '00000000-0000-0000-0000-000000000000'::uuid, '10000000-0000-0000-0000-000000000000'::uuid), (2, '10000000-0000-0000-0000-000000000001', '20000000-0000-0000-0000-000000000000'), (3, '20000000-0000-0000-0000-000000000001', '30000000-0000-0000-0000-000000000000'), (4, '30000000-0000-0000-0000-000000000001', '40000000-0000-0000-0000-000000000000'), (5, '40000000-0000-0000-0000-000000000001', '50000000-0000-0000-0000-000000000000'), (6, '50000000-0000-0000-0000-000000000001', '60000000-0000-0000-0000-000000000000'), (7, '60000000-0000-0000-0000-000000000001', '70000000-0000-0000-0000-000000000000'), (8, '70000000-0000-0000-0000-000000000001', '80000000-0000-0000-0000-000000000000'), (9, '80000000-0000-0000-0000-000000000001', '90000000-0000-0000-0000-000000000000'), (10, '90000000-0000-0000-0000-000000000001', 'a0000000-0000-0000-0000-000000000000'), (11, 'a0000000-0000-0000-0000-000000000001', 'b0000000-0000-0000-0000-000000000000'), (12, 'b0000000-0000-0000-0000-000000000001', 'c0000000-0000-0000-0000-000000000000'), (13, 'c0000000-0000-0000-0000-000000000001', 'd0000000-0000-0000-0000-000000000000'), (14, 'd0000000-0000-0000-0000-000000000001', 'e0000000-0000-0000-0000-000000000000'), (15, 'e0000000-0000-0000-0000-000000000001', 'f0000000-0000-0000-0000-000000000000'), (16, 'f0000000-0000-0000-0000-000000000001', 'ffffffff-ffff-ffff-ffff-ffffffffffff'); insert into fs09000000_mod_search.instances (instance_id, shared, isBoundWith, instance_json) select id, true, -- for member tenant (select (exists (select 1 from fs09000000_mod_inventory_storage.bound_with_part bw join fs09000000_mod_inventory_storage.item it on it.id = bw.itemid join fs09000000_mod_inventory_storage.holdings_record hr on hr.id = bw.holdingsrecordid where hr.instanceid = i.id limit 1)) as "exists"), -- what is it? i.jsonb from fs09000000_mod_inventory_storage.instance i inner join ranges r on i.id between r.lower and r.upper where r.range_id = range_to_process; insert into fs09000000_mod_search.holdings (holding_id, tenant_id, instance_id, holding_json) select h.id, 'fs09000000', -- or central i.id, -- what is it? h.jsonb FROM fs09000000_mod_inventory_storage.holdings_record h inner join fs09000000_mod_inventory_storage.instance i on h.instanceid = i.id inner join ranges r on i.id between r.lower and r.upper where r.range_id = range_to_process; insert into fs09000000_mod_search.items (item_id, tenant_id, holding_id, item_json) select i.id, 'fs09000000', -- or central h.id, i.jsonb FROM fs09000000_mod_inventory_storage.holdings_record h inner join fs09000000_mod_inventory_storage.item i on h.id = i.holdingsrecordid inner join ranges r on h.instanceid between r.lower and r.upper where r.range_id = range_to_process; end; $$ language plpgsql;

Target Architecture Option 2

Key aspects of the solution

The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:

  1. The whole reindexing process should be split into two steps.

    1. Aggregate all the information related to instances, holdings, and items in the mod-search database

    2. Bulk upload prepared documents from the mod-search database to Opensearch/Elasticsearch

  2. Split table consortium_instance to instances, holdings, and items tables to reduce data
    duplication

  3. Create a table for tracking range processing progress

  4. For initial inventory indexing or when the inventory data model changes, all entities from the inventory
    should be copied through the Kafka topic.

  5. Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1

  6. The existing event model should support the new data model in the mod-search database

Sequence Diagram

ERD

WBS

#

Story

Task

Description

Module 

1

Merge documents in postgres

Counts REST endpoint

Create a REST endpoint that would return count of instances, holdings and items in current tenant's database

mod-inventory-storage

2

Database changes

  1. Create migration scripts for Instances, Holdings, Items tables

  2. Create migration scripts for Instance insert/update triggers for Subjects, Contributors, Classifications

  3. Create migration scripts for Ranges table

mod-search

3

Initialize reindex

  1. Create REST endpoint that triggers Full reindex or merge phase of Reindex only

  2. Truncate Instances, Holdings, Items, Ranges tables

  3. Get tenants list [ECS]

  4. Request counts from inventory storage

  5. Calculate UUID ranges and fill Ranges table

  6. Call mod-inventory-storage endpoint

mod-search

4

Send range of entities

Create a REST endpoint that would receive UUID range and entity type and send related entitiy ranges into kafka topic (Create topic with small retention time and increased message size)
NOTE:

  1. if message size is larger than max message size then it is needed to split it into parts

  2. each entity type should be sent separately (instances/items/hodligs)

  3. on member tenants in ECS send only local instances and all items and holdings

mod-inventory-storage

5

Entities kafka Listener

Create kafka listener that would receive a list of entities related to Range, insert the entity into related table and update Range table
NOTE: If all ranges are filled, then trigger Bulk upload

mod-search

6

Reindex status Endpoint

Create REST endpoint that allows to track progress through Ranges table

 

7

Bulk upload documents to Search Index

Initialize upload of merged documents

Create REST endpoint that triggers upload phase of Reindex only

mod-search

8

Create ranges

  1. Calculate UUID ranges and fill Upload-Ranges table for each index (instances, subjects, contributors, classifications)

  2. Send entity ranges to kafka

mod-search

9

Consume ranges

Create kafka listener that would receive a list of entities related to Range, select related entites from database and bulk upload documents to Search index and update Range table

mod-search

10

Reindex status Endpoint

Create REST endpoint that allows to track progress through Ranges table

 

11

Align ongoing indexing for new database structure

 

 

mod-search

Further Improvements

  1. Use a procedure to populate secondary tables after the main table is complete within the transaction

    • use a temp table to track the instance modified.

    • compute the list of ex. subjects to be inserted into the subjects table. list of subjects should be deduped before inserting(unique clause?)

  2. Perform insert into subjects table in batch(one insert statement) not loop.

    • no-logging on db tables

    • is there a separate code path for initial load vs updates? we should consider this.

    • if so, new tables for each reindex job could be created with nologging