Implementation of the classification browse feature required the introduction of a new search index. Adding a new index negatively impacted the reindexing procedure performance. For tenants with large datasets, the reindexing procedure exceeds the maintenance time window.
Especially, the impact is significant on ECS environment due to the need to aggregate data across multiple tenants' inventory storage. The event model of reindexing involves receiving "create/update" domain events by mod-search which have only identifiers of related instances and the module would fetch the full information on the entity through HTTP requests. This is the root cause of the reindexing procedure slowness. The proposed solution describes the approach to address the issue with database schema-to-schema communication instead of HTTP communication.
Requirements
Functional requirements
There should be no impact on the current behavior of search capabilities.
The event model for indexing newly created/updated documents should remain as-is
HTTP calls to inventory impact the latency of indexing of a single instance.
Slow-running “upsert scripts” for partial updates in OpenSearch/Elasticsearch.
The need to aggregate instances across multiple tenants in an ECS environment requires multiple updates for every instance
Data duplication in the consortium_instance table might cause additional overhead in Postgres performance for big dataset
Solution Options
#
Option
Description
Pros
Cons
Decision
#
Option
Description
Pros
Cons
Decision
0
Existing architecture
The reindexing procedure is based on the domain event model.
Reaches the limits of a maintenance window
Change of search mapping requires full reindexing
Domain events during reindexing only contain instance id and require additional HTTP request for each instance to get the whole entity
1
Database-to-Database query
The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done in the db-to-db query
Stages can be started independently
Support both ECS and standalone mode
Performance improved
The “merge” stage requires data duplication in mod-search’s database
Dependency on internal database structure between mod-search and mod-inventory-storage
Requires cross-tenant and cross-schema queries
Cannot be applied in cases when modules' databases are located on the different database servers
Rejected due to introduction of dependencies
2
Reindexing through Kafka
The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done by sending all entities through the Kafka topic
Stages can be started independently
Support both ECS and standalone mode
Performance improved
The boundaries of modules and tenants are not affected
The “merge” stage requires data duplication in mod-search’s database
Target Architecture Option 1
Assumptions
The solution is aimed at ECS and non-ECS environments using a schema-per-tenant database approach. For other cases the existing implementation of the reindexing procedure is available.
Schema-to-schema communication is not a user-facing use case. And is only limited to the initial load as it is a background maintenance process.
Key aspects of the solution
The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:
The whole reindexing process should be split into two steps.
Aggregate all the information related to instances, holdings, and items in the mod-search database
Bulk upload prepared documents from the mod-search database to Opensearch/Elasticsearch
Split table consortium_instance to instances, holdings, and items tables to reduce data duplication
Create a table for tracking chunk processing progress
For initial inventory indexing or when the inventory data model changes, all entities from the inventory should be copied through INSERT INTO ... SELECT with cross-tenant (cross-schema) SQL queries. To allow parallel execution of queries it is proposed to split data transfer into chunks.
Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1
The existing event model should support the new data model in the mod-search database
Sequence Diagram
ERD
Solution Details
The database of mod-search in a single-tenant environment or ECS mode should contain the following tables that would work as aggregates for entities that will be posted to the search index.
create table instances
(
instance_id varchar(255) not null,
shared boolean not null default false,
isBoundWith boolean not null default false,
instance_json jsonb not null,
primary key (instance_id)
);
create table holdings
(
holding_id varchar(255) not null,
tenant_id varchar(255) not null,
instance_id varchar(255) not null,
holding_json jsonb not null,
primary key (holding_id, tenant_id)
);
create index holdings_instances_idx on holdings (instance_id);
create table items
(
item_id varchar(255) not null,
tenant_id varchar(255) not null,
holding_id varchar(255) not null,
item_json jsonb not null,
primary key (item_id, tenant_id)
);
create index items_holdings_idx on items (holding_id);
create table classifications
(
classification_number varchar(255) not null,
classification_type_id varchar(255) not null,
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (classification_number, classification_type_id, instance_id)
);
create table subjects
(
subject_id varchar(255) not null,
subject_value varchar(255) not null,
authority_id varchar(255) not null default '',
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (subject_id, subject_value, instance_id)
);
create table contributors
(
contributor_id varchar(255) not null,
contributor_name varchar(255) not null,
contributor_name_type_id varchar(255) not null,
authority_id varchar(255) not null default '',
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (contributor_id, contributor_name, contributor_name_type_id, instance_id)
);
-- create triggers for classifications
create or replace function classification_trigger() returns trigger as
$$
declare
i jsonb;
begin
-- extract classifications
for i in
select *
from jsonb_array_elements(new.instance_json -> 'classifications')
loop
insert
into classifications(classification_number, classification_type_id, instance_id, shared)
values (i ->> 'classificationNumber', i ->> 'classificationTypeId', new.instance_id, new.shared)
on conflict (classification_type_id, classification_number, instance_id)
do update set shared = excluded.shared;
end loop;
-- extract subjects
for i in
select *
from jsonb_array_elements(new.instance_json -> 'subjects')
loop
insert
into subjects(subject_id, subject_value, authority_id, instance_id, shared)
values (encode(digest((i ->> 'value' || i ->> 'authorityId')::bytea, 'sha1'),
'hex'), -- requires pgcrypto enabled
i ->> 'value',
i ->> 'authorityId',
new.instance_id, new.shared)
on conflict (subject_id, subject_value, instance_id)
do update set shared = excluded.shared;
end loop;
-- extract contributors
for i in
select *
from jsonb_array_elements(new.instance_json -> 'contributors')
loop
insert
into contributors(contributor_id, contributor_name, contributor_name_type_id, authority_id, instance_id,
shared)
values (encode(digest((i ->> 'name' || i ->> 'authorityId')::bytea, 'sha1'),
'hex'), -- requires pgcrypto enabled
i ->> 'name',
i ->> 'contributorNameTypeId',
new.instance_id, new.shared)
on conflict (contributor_id, contributor_name, contributor_name_type_id, instance_id)
do update set shared = excluded.shared;
end loop;
return new;
end ;
$$ language plpgsql;
create trigger instance_trigger
after insert or
update
on instances
for each row
execute function classification_trigger();
-- aggregated view
create or replace view instances_view
as
select h.tenant_id,
i.instance_id,
jsonb_build_object('instanceId', i.instance_id,
'instance', i.instance_json,
'holdingsRecords', h.holding_json,
'items', it.item_json,
'itBoundWith', i.isBoundWith) as json
from instances i
left join holdings h
on i.instance_id = h.instance_id
left join items it
on h.holding_id = it.holding_id;
The sample of the initial data upload function from the database of mod-inventory-storage to the mod-search database is below:
create or replace function copy_data_from_inventory(range_to_process int) as
$$
declare
ranges table
(
range_id int,
lower uuid,
upper uuid
);
begin
insert into ranges(range_id, lower, upper)
values (1::int, '00000000-0000-0000-0000-000000000000'::uuid, '10000000-0000-0000-0000-000000000000'::uuid),
(2, '10000000-0000-0000-0000-000000000001', '20000000-0000-0000-0000-000000000000'),
(3, '20000000-0000-0000-0000-000000000001', '30000000-0000-0000-0000-000000000000'),
(4, '30000000-0000-0000-0000-000000000001', '40000000-0000-0000-0000-000000000000'),
(5, '40000000-0000-0000-0000-000000000001', '50000000-0000-0000-0000-000000000000'),
(6, '50000000-0000-0000-0000-000000000001', '60000000-0000-0000-0000-000000000000'),
(7, '60000000-0000-0000-0000-000000000001', '70000000-0000-0000-0000-000000000000'),
(8, '70000000-0000-0000-0000-000000000001', '80000000-0000-0000-0000-000000000000'),
(9, '80000000-0000-0000-0000-000000000001', '90000000-0000-0000-0000-000000000000'),
(10, '90000000-0000-0000-0000-000000000001', 'a0000000-0000-0000-0000-000000000000'),
(11, 'a0000000-0000-0000-0000-000000000001', 'b0000000-0000-0000-0000-000000000000'),
(12, 'b0000000-0000-0000-0000-000000000001', 'c0000000-0000-0000-0000-000000000000'),
(13, 'c0000000-0000-0000-0000-000000000001', 'd0000000-0000-0000-0000-000000000000'),
(14, 'd0000000-0000-0000-0000-000000000001', 'e0000000-0000-0000-0000-000000000000'),
(15, 'e0000000-0000-0000-0000-000000000001', 'f0000000-0000-0000-0000-000000000000'),
(16, 'f0000000-0000-0000-0000-000000000001', 'ffffffff-ffff-ffff-ffff-ffffffffffff');
insert into fs09000000_mod_search.instances (instance_id, shared, isBoundWith, instance_json)
select id,
true, -- for member tenant
(select (exists (select 1
from fs09000000_mod_inventory_storage.bound_with_part bw
join fs09000000_mod_inventory_storage.item it on it.id = bw.itemid
join fs09000000_mod_inventory_storage.holdings_record hr
on hr.id = bw.holdingsrecordid
where hr.instanceid = i.id
limit 1)) as "exists"), -- what is it?
i.jsonb
from fs09000000_mod_inventory_storage.instance i
inner join ranges r
on i.id between r.lower and r.upper
where r.range_id = range_to_process;
insert
into fs09000000_mod_search.holdings (holding_id, tenant_id, instance_id, holding_json)
select h.id,
'fs09000000', -- or central
i.id, -- what is it?
h.jsonb
FROM fs09000000_mod_inventory_storage.holdings_record h
inner join fs09000000_mod_inventory_storage.instance i
on h.instanceid = i.id
inner join ranges r
on i.id between r.lower and r.upper
where r.range_id = range_to_process;
insert into fs09000000_mod_search.items (item_id, tenant_id, holding_id, item_json)
select i.id,
'fs09000000', -- or central
h.id,
i.jsonb
FROM fs09000000_mod_inventory_storage.holdings_record h
inner join fs09000000_mod_inventory_storage.item i
on h.id = i.holdingsrecordid
inner join ranges r
on h.instanceid between r.lower and r.upper
where r.range_id = range_to_process;
end;
$$ language plpgsql;
Target Architecture Option 2
Key aspects of the solution
The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:
The whole reindexing process should be split into two steps.
Aggregate all the information related to instances, holdings, and items in the mod-search database
Bulk upload prepared documents from the mod-search database to Opensearch/Elasticsearch
Split table consortium_instance to instances, holdings, and items tables to reduce data duplication
Create a table for tracking range processing progress
For initial inventory indexing or when the inventory data model changes, all entities from the inventory should be copied through the Kafka topic.
Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1
The existing event model should support the new data model in the mod-search database
Sequence Diagram
ERD
WBS
#
Story
Task
Description
Module
1
Merge documents in postgres
Counts REST endpoint
Create a REST endpoint that would return count of instances, holdings and items in current tenant's database
mod-inventory-storage
2
Database changes
Create migration scripts for Instances, Holdings, Items tables
Create migration scripts for Instance insert/update triggers for Subjects, Contributors, Classifications
Create migration scripts for Ranges table
mod-search
3
Initialize reindex
Create REST endpoint that triggers Full reindex or merge phase of Reindex only
Create a REST endpoint that would receive UUID range and entity type and send related entitiy ranges into kafka topic (Create topic with small retention time and increased message size) NOTE:
if message size is larger than max message size then it is needed to split it into parts
each entity type should be sent separately (instances/items/hodligs)
on member tenants in ECS send only local instances and all items and holdings
mod-inventory-storage
5
Entities kafka Listener
Create kafka listener that would receive a list of entities related to Range, insert the entity into related table and update Range table NOTE: If all ranges are filled, then trigger Bulk upload
mod-search
6
Reindex status Endpoint
Create REST endpoint that allows to track progress through Ranges table
7
Bulk upload documents to Search Index
Initialize upload of merged documents
Create REST endpoint that triggers upload phase of Reindex only
mod-search
8
Create ranges
Calculate UUID ranges and fill Upload-Ranges table for each index (instances, subjects, contributors, classifications)
Send entity ranges to kafka
mod-search
9
Consume ranges
Create kafka listener that would receive a list of entities related to Range, select related entites from database and bulk upload documents to Search index and update Range table
mod-search
10
Reindex status Endpoint
Create REST endpoint that allows to track progress through Ranges table
11
Align ongoing indexing for new database structure
mod-search
Further Improvements
Use a procedure to populate secondary tables after the main table is complete within the transaction
use a temp table to track the instance modified.
compute the list of ex. subjects to be inserted into the subjects table. list of subjects should be deduped before inserting(unique clause?)
Perform insert into subjects table in batch(one insert statement) not loop.
no-logging on db tables
is there a separate code path for initial load vs updates? we should consider this.
if so, new tables for each reindex job could be created with nologging