Ongoing Indexing with Merged Documents in Postgres

Problem statement

Issue: MSEARCH-899: mod-search queries performance optimisation In Progress

Description: With the introduction of the improvements to the Reindex the procedure was split into two different phases. The merge phase is aimed at preprocessing all the documents from the inventory and storing them in the Postgres database. The second phase is an “upload” phase and aims to load preprocessed search documents to the OpenSearch database. This approach allowed us to run only the second phase when there were no major changes to inventory.

However, the ongoing indexing creates the issue mentioned in the ticket because the “merge” and “upload” phases should happen sequentially for each document and the aggregate query should be run on each insert/delete operation. That creates a heavy load on the Postgres database because of a large number of simultaneous write/read operations to the same tables.

Solution options

Option

Description

Pros & Cons

Decision

Option

Description

Pros & Cons

Decision

1

Query optimization

Creation of additional indexes and/or reorganization of the query

Small effort

declined due to low to none impact on the performance

2

Extracting upload into a separate thread

The extracting upload operation will allow reducing the amount of the same or similar aggregate queries on each insert/delete operation

Pros:

  • Medium effort

Cons:

  • Eventual consistency for sub-resources (subjects, contributors, classifications

in progress

3

Aggregation of counts on update/insert

To address a large amount of persisted records it is proposed to persist not relations like contributor-instances, but rather just counts related to each sub-resource. The approach can be applied to improve #2 as they do not contradict

Pros:

  • Less data to store

  • Less load on Postgres CPU

Cons:

  • Same as option 2

draft

Extracting upload into a separate thread

The approach is as follows (using contributors as an example):

  1. Add last_updated_date to the contributors table

  2. Create the sub_resources_locks table(id, entity_type, locked_flag, last_updated_date) and fill it for all types of subresources

  3. Create a job that every minute gets the list of tenants select t.tenantjson -> 'descriptor' ->> 'id' from public.tenants t where t.tenantjson -> 'descriptor' ->> 'id' <> 'supertenant'; and for each of the tenants:

    1. Tries to update sub_resources_locks set locked_flag=true where entity_type='contributor' and locked_flag = false returning id, last_updated_date (transactional)

    2. if the id has returned, then fetch the aggregate query of contributors with where last_updated_date >=? order by last_updated_date limit 100 clause. The value of 100 is the amount of records that could be processed in a minute, even if not, the lock will not allow it to run in the next minute.

    3. Load aggregates to OpenSeach

    4. Run update sub_resources_locks set locked_flag=false, last_updated_date=:timestamp where entity_type='contributor' and locked_flag = true, where :timestamp is the latest timestamp among the records that were fetched in step (b)

NB! In ECS mode the job only should run for the central tenant

  • Query to check if there are any unprocessed contributors:

SELECT * FROM contributor WHERE last_updated_date > (SELECT last_updated_date FROM sub_resources_lock WHERE entity_type='contributor') ORDER BY last_updated_date DESC

Aggregation of counts on update/insert

The approach consists of the following steps (using contributors as an example):

  • Create table contribututor_counts with the following structure:

Column

Type

PK

Column

Type

PK

contributor_id

varchar(40)

yes

tenant_id

varchar(100)

yes

type_id

varchar(40)

yes

count

bigint

no

  • On the create/update/delete operations on an instance, the following actions should be applied:

Operations

Sub-resources

Insert query

Delete query

Operations

Sub-resources

Insert query

Delete query

instance inserted

list to insert

insert into contributor_counts... on conflict update count=count+1

N/A

instance updated

list to insert + list to delete

insert into contributor_counts... on conflict update count=count+1

update contributor_counts set count=count-1 where contributor_id =? and tenant_id = ? And type_id =? And count >0

instance deleted

list to delete

N/A

update contributor_counts set count=count-1 where contributor_id =? and tenant_id = ? And type_id =? And count >0

  • The aggregation query:

SELECT c.id, c.name, c.name_type_id, c.authority_id, json_agg( json_build_object( 'count', sub.instance_count, 'typeId', sub.type_ids, 'shared', false, 'tenantId', sub.tenant_id ) ) AS instances FROM ( SELECT ins.contributor_id, ins.tenant_id, sum(ins.count) as instance_count, array_agg(DISTINCT ins.type_id) FILTER (WHERE ins.type_id <> '') as type_ids FROM contributor_count ins WHERE ins.contributor_id in (?, ?) GROUP BY ins.contributor_id, ins.tenant_id ) sub JOIN contributor c ON c.id = sub.contributor_id WHERE c.id in (?, ?) GROUP BY c.id;