Ongoing Indexing with Merged Documents in Postgres
Problem statement
Issue: MSEARCH-899: mod-search queries performance optimisation Closed
Description: With the introduction of the improvements to the Reindex the procedure was split into two different phases. The merge phase is aimed at preprocessing all the documents from the inventory and storing them in the Postgres database. The second phase is an โuploadโ phase and aims to load preprocessed search documents to the OpenSearch database. This approach allowed us to run only the second phase when there were no major changes to inventory.
However, the ongoing indexing creates the issue mentioned in the ticket because the โmergeโ and โuploadโ phases should happen sequentially for each document and the aggregate query should be run on each insert/delete operation. That creates a heavy load on the Postgres database because of a large number of simultaneous write/read operations to the same tables.
Solution options
Option | Description | Pros & Cons | Decision | |
---|---|---|---|---|
1 | Query optimization | Creation of additional indexes and/or reorganization of the query | Small effort | declined due to low to none impact on the performance |
2 | Extracting upload into a separate thread | The extracting upload operation will allow reducing the amount of the same or similar aggregate queries on each insert/delete operation | Pros:
Cons:
| in progress |
3 | Aggregation of counts on update/insert | To address a large amount of persisted records it is proposed to persist not relations like contributor-instances, but rather just counts related to each sub-resource. The approach can be applied to improve #2 as they do not contradict | Pros:
Cons:
| draft |
Extracting upload into a separate thread
The approach is as follows (using contributors as an example):
Add
last_updated_date
to thecontributors
tableCreate the
sub_resources_locks
table(id, entity_type, locked_flag, last_updated_date
) and fill it for all types of subresourcesCreate a job that every minute gets the list of tenants
select t.tenantjson -> 'descriptor' ->> 'id' from public.tenants t where t.tenantjson -> 'descriptor' ->> 'id' <> 'supertenant';
and for each of the tenants:Tries to
update sub_resources_locks set locked_flag=true where entity_type='contributor' and locked_flag = false returning id, last_updated_date
(transactional)if the
id
has returned, then fetch the aggregate query of contributors withwhere last_updated_date >=? order by last_updated_date limit 100
clause. The value of 100 is the amount of records that could be processed in a minute, even if not, the lock will not allow it to run in the next minute.Load aggregates to OpenSeach
Run
update sub_resources_locks set locked_flag=false, last_updated_date=:timestamp where entity_type='contributor' and locked_flag = true
, where:timestamp
is the latest timestamp among the records that were fetched in step (b)
NB! In ECS mode the job only should run for the central tenant
Query to check if there are any unprocessed contributors:
SELECT * FROM contributor
WHERE last_updated_date > (SELECT last_updated_date
FROM sub_resources_lock WHERE entity_type='contributor')
ORDER BY last_updated_date DESC
Aggregation of counts on update/insert
The approach consists of the following steps (using contributors as an example):
Create table
contribututor_counts
with the following structure:
Column | Type | PK |
---|---|---|
contributor_id | varchar(40) | yes |
tenant_id | varchar(100) | yes |
type_id | varchar(40) | yes |
count | bigint | no |
On the create/update/delete operations on an instance, the following actions should be applied:
Operations | Sub-resources | Insert query | Delete query |
---|---|---|---|
instance inserted | list to insert |
| N/A |
instance updated | list to insert + list to delete |
|
|
instance deleted | list to delete | N/A |
|
The aggregation query:
SELECT
c.id,
c.name,
c.name_type_id,
c.authority_id,
json_agg(
json_build_object(
'count', sub.instance_count,
'typeId', sub.type_ids,
'shared', false,
'tenantId', sub.tenant_id
)
) AS instances
FROM
(
SELECT
ins.contributor_id,
ins.tenant_id,
sum(ins.count) as instance_count,
array_agg(DISTINCT ins.type_id) FILTER (WHERE ins.type_id <> '') as type_ids
FROM contributor_count ins
WHERE
ins.contributor_id in (?, ?)
GROUP BY
ins.contributor_id,
ins.tenant_id
) sub
JOIN
contributor c ON c.id = sub.contributor_id
WHERE
c.id in (?, ?)
GROUP BY
c.id;
ย