Reindex Improvements
Status | DONE |
|---|---|
Impact | high |
Prod Ticket | |
Arch Ticket |
Summary
Implementation of the classification browse feature required the introduction of a new search index. Adding a new index negatively impacted the reindexing procedure performance. For tenants with large datasets, the reindexing procedure exceeds the maintenance time window.
Especially, the impact is significant on ECS environment due to the need to aggregate data across multiple tenants' inventory storage. The event model of reindexing involves receiving "create/update" domain events by mod-search which have only identifiers of related instances and the module would fetch the full information on the entity through HTTP requests. This is the root cause of the reindexing procedure slowness. The proposed solution describes the approach to address the issue with database schema-to-schema communication instead of HTTP communication.
Requirements
Functional requirements
There should be no impact on the current behavior of search capabilities.
The event model for indexing newly created/updated documents should remain as-is
Non-functional requirements
Performance
ECS Support
Baseline Architecture
The baseline architecture is described here:
Drawbacks of existing solution:
HTTP calls to inventory impact the latency of indexing of a single instance.
Slow-running “upsert scripts” for partial updates in OpenSearch/Elasticsearch.
The need to aggregate instances across multiple tenants in an ECS environment requires multiple updates for every instance
Data duplication in the
consortium_instancetable might cause additional overhead in Postgres performance for big dataset
Solution Options
# | Option | Description | Pros | Cons | Decision |
|---|
# | Option | Description | Pros | Cons | Decision |
|---|---|---|---|---|---|
0 | Existing architecture | The reindexing procedure is based on the domain event model. |
|
|
|
1 | Database-to-Database query | The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done in the db-to-db query |
|
| Rejected due to introduction of dependencies |
2 | Reindexing through Kafka | The reindexing is split into “merge” and “indexing” stages and the “merge” stage is done by sending all entities through the Kafka topic |
|
|
|
Target Architecture Option 1
Assumptions
The solution is aimed at ECS and non-ECS environments using a schema-per-tenant database approach. For other cases the existing implementation of the reindexing procedure is available.
Schema-to-schema communication is not a user-facing use case. And is only limited to the initial load as it is a background maintenance process.
Key aspects of the solution
The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:
The whole reindexing process should be split into two steps.
Aggregate all the information related to instances, holdings, and items in the
mod-searchdatabaseBulk upload prepared documents from the
mod-searchdatabase to Opensearch/Elasticsearch
Split table
consortium_instancetoinstances,holdings, anditemstables to reduce data
duplicationCreate a table for tracking chunk processing progress
For initial inventory indexing or when the inventory data model changes, all entities from the inventory
should be copied throughINSERT INTO ... SELECTwith cross-tenant (cross-schema) SQL queries. To allow parallel execution of queries it is proposed to split data transfer into chunks.Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1
The existing event model should support the new data model in the
mod-searchdatabase
Sequence Diagram
ERD
Solution Details
The database of mod-search in a single-tenant environment or ECS mode should contain the following tables that would work as aggregates for entities that will be posted to the search index.
create table instances
(
instance_id varchar(255) not null,
shared boolean not null default false,
isBoundWith boolean not null default false,
instance_json jsonb not null,
primary key (instance_id)
);
create table holdings
(
holding_id varchar(255) not null,
tenant_id varchar(255) not null,
instance_id varchar(255) not null,
holding_json jsonb not null,
primary key (holding_id, tenant_id)
);
create index holdings_instances_idx on holdings (instance_id);
create table items
(
item_id varchar(255) not null,
tenant_id varchar(255) not null,
holding_id varchar(255) not null,
item_json jsonb not null,
primary key (item_id, tenant_id)
);
create index items_holdings_idx on items (holding_id);
create table classifications
(
classification_number varchar(255) not null,
classification_type_id varchar(255) not null,
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (classification_number, classification_type_id, instance_id)
);
create table subjects
(
subject_id varchar(255) not null,
subject_value varchar(255) not null,
authority_id varchar(255) not null default '',
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (subject_id, subject_value, instance_id)
);
create table contributors
(
contributor_id varchar(255) not null,
contributor_name varchar(255) not null,
contributor_name_type_id varchar(255) not null,
authority_id varchar(255) not null default '',
instance_id varchar(255) not null,
shared boolean not null default false,
primary key (contributor_id, contributor_name, contributor_name_type_id, instance_id)
);
-- create triggers for classifications
create or replace function classification_trigger() returns trigger as
$$
declare
i jsonb;
begin
-- extract classifications
for i in
select *
from jsonb_array_elements(new.instance_json -> 'classifications')
loop
insert
into classifications(classification_number, classification_type_id, instance_id, shared)
values (i ->> 'classificationNumber', i ->> 'classificationTypeId', new.instance_id, new.shared)
on conflict (classification_type_id, classification_number, instance_id)
do update set shared = excluded.shared;
end loop;
-- extract subjects
for i in
select *
from jsonb_array_elements(new.instance_json -> 'subjects')
loop
insert
into subjects(subject_id, subject_value, authority_id, instance_id, shared)
values (encode(digest((i ->> 'value' || i ->> 'authorityId')::bytea, 'sha1'),
'hex'), -- requires pgcrypto enabled
i ->> 'value',
i ->> 'authorityId',
new.instance_id, new.shared)
on conflict (subject_id, subject_value, instance_id)
do update set shared = excluded.shared;
end loop;
-- extract contributors
for i in
select *
from jsonb_array_elements(new.instance_json -> 'contributors')
loop
insert
into contributors(contributor_id, contributor_name, contributor_name_type_id, authority_id, instance_id,
shared)
values (encode(digest((i ->> 'name' || i ->> 'authorityId')::bytea, 'sha1'),
'hex'), -- requires pgcrypto enabled
i ->> 'name',
i ->> 'contributorNameTypeId',
new.instance_id, new.shared)
on conflict (contributor_id, contributor_name, contributor_name_type_id, instance_id)
do update set shared = excluded.shared;
end loop;
return new;
end ;
$$ language plpgsql;
create trigger instance_trigger
after insert or
update
on instances
for each row
execute function classification_trigger();
-- aggregated view
create or replace view instances_view
as
select h.tenant_id,
i.instance_id,
jsonb_build_object('instanceId', i.instance_id,
'instance', i.instance_json,
'holdingsRecords', h.holding_json,
'items', it.item_json,
'itBoundWith', i.isBoundWith) as json
from instances i
left join holdings h
on i.instance_id = h.instance_id
left join items it
on h.holding_id = it.holding_id;The sample of the initial data upload function from the database of mod-inventory-storage to the mod-search database is below:
create or replace function copy_data_from_inventory(range_to_process int) as
$$
declare
ranges table
(
range_id int,
lower uuid,
upper uuid
);
begin
insert into ranges(range_id, lower, upper)
values (1::int, '00000000-0000-0000-0000-000000000000'::uuid, '10000000-0000-0000-0000-000000000000'::uuid),
(2, '10000000-0000-0000-0000-000000000001', '20000000-0000-0000-0000-000000000000'),
(3, '20000000-0000-0000-0000-000000000001', '30000000-0000-0000-0000-000000000000'),
(4, '30000000-0000-0000-0000-000000000001', '40000000-0000-0000-0000-000000000000'),
(5, '40000000-0000-0000-0000-000000000001', '50000000-0000-0000-0000-000000000000'),
(6, '50000000-0000-0000-0000-000000000001', '60000000-0000-0000-0000-000000000000'),
(7, '60000000-0000-0000-0000-000000000001', '70000000-0000-0000-0000-000000000000'),
(8, '70000000-0000-0000-0000-000000000001', '80000000-0000-0000-0000-000000000000'),
(9, '80000000-0000-0000-0000-000000000001', '90000000-0000-0000-0000-000000000000'),
(10, '90000000-0000-0000-0000-000000000001', 'a0000000-0000-0000-0000-000000000000'),
(11, 'a0000000-0000-0000-0000-000000000001', 'b0000000-0000-0000-0000-000000000000'),
(12, 'b0000000-0000-0000-0000-000000000001', 'c0000000-0000-0000-0000-000000000000'),
(13, 'c0000000-0000-0000-0000-000000000001', 'd0000000-0000-0000-0000-000000000000'),
(14, 'd0000000-0000-0000-0000-000000000001', 'e0000000-0000-0000-0000-000000000000'),
(15, 'e0000000-0000-0000-0000-000000000001', 'f0000000-0000-0000-0000-000000000000'),
(16, 'f0000000-0000-0000-0000-000000000001', 'ffffffff-ffff-ffff-ffff-ffffffffffff');
insert into fs09000000_mod_search.instances (instance_id, shared, isBoundWith, instance_json)
select id,
true, -- for member tenant
(select (exists (select 1
from fs09000000_mod_inventory_storage.bound_with_part bw
join fs09000000_mod_inventory_storage.item it on it.id = bw.itemid
join fs09000000_mod_inventory_storage.holdings_record hr
on hr.id = bw.holdingsrecordid
where hr.instanceid = i.id
limit 1)) as "exists"), -- what is it?
i.jsonb
from fs09000000_mod_inventory_storage.instance i
inner join ranges r
on i.id between r.lower and r.upper
where r.range_id = range_to_process;
insert
into fs09000000_mod_search.holdings (holding_id, tenant_id, instance_id, holding_json)
select h.id,
'fs09000000', -- or central
i.id, -- what is it?
h.jsonb
FROM fs09000000_mod_inventory_storage.holdings_record h
inner join fs09000000_mod_inventory_storage.instance i
on h.instanceid = i.id
inner join ranges r
on i.id between r.lower and r.upper
where r.range_id = range_to_process;
insert into fs09000000_mod_search.items (item_id, tenant_id, holding_id, item_json)
select i.id,
'fs09000000', -- or central
h.id,
i.jsonb
FROM fs09000000_mod_inventory_storage.holdings_record h
inner join fs09000000_mod_inventory_storage.item i
on h.id = i.holdingsrecordid
inner join ranges r
on h.instanceid between r.lower and r.upper
where r.range_id = range_to_process;
end;
$$ language plpgsql;Target Architecture Option 2
Key aspects of the solution
The solution should address both single-tenant environments and ECS. Key aspects of the proposed solution are:
The whole reindexing process should be split into two steps.
Aggregate all the information related to instances, holdings, and items in the
mod-searchdatabaseBulk upload prepared documents from the
mod-searchdatabase to Opensearch/Elasticsearch
Split table
consortium_instancetoinstances,holdings, anditemstables to reduce data
duplicationCreate a table for tracking range processing progress
For initial inventory indexing or when the inventory data model changes, all entities from the inventory
should be copied through the Kafka topic.Bulk upload of documents should happen when there are any mapping search index changes. This should be a separate procedure from #1
The existing event model should support the new data model in the
mod-searchdatabase