MSEARCH-1057 Reindexing of Individual Member Tenants in ECS

MSEARCH-1057 Reindexing of Individual Member Tenants in ECS

Overview

This document describes the implementation of member tenant reindexing functionality in mod-search, designed to address the need for selective reindexing of specific consortium members without affecting shared consortium data or disrupting other tenants.

Below is a patch on v5.0.3 of mod-search with some prototype code. It can be used as a reference.

Problem Statement

In consortium deployments, it's often necessary to reindex data for a specific member tenant due to:

  • Data corruption or indexing issues specific to one member

  • Maintenance operations requiring targeted updates

  • Testing and troubleshooting scenarios

The existing full reindex operation processes all consortium members, which is inefficient and potentially disruptive when only one tenant needs reindexing.

Solution Architecture

High-Level Design

The member tenant reindex feature introduces selective reindexing capabilities while preserving consortium data integrity through:

  1. Tenant-Specific Processing: Option to target a single consortium member for reindexing

  2. Staging Tables: High-performance buffer system for bulk data operations

  3. Shared Instance Preservation: Maintains consortium-wide shared instances during member-specific operations

  4. Child Resource Preservation: Maintains consortium-wide child resources

  5. Zero Disruption Operations: Other consortium members maintain uninterrupted service

1. API Enhancement

Endpoint: POST /search/index/instance-records/reindex/full

Enhanced Request Schema:

{ "tenantId": "optional_specific_tenant_id", "indexSettings": { "numberOfShards": 2, "numberOfReplicas": 4, "refreshInterval": 1 } }
  • tenantId: Optional parameter to specify which consortium member to reindex

  • If omitted, processes all consortium members (existing behavior)

2. Staging Tables Architecture

Performance-First Design: Database resource consumption should be kept to a minimum since other consortium members would be live.

  • Unlogged Tables: No WAL (Write-Ahead Log) overhead for maximum write throughput

  • Partitioned Structure: Enables parallel processing across multiple workers

  • No Indexes: Eliminates index maintenance overhead during bulk inserts
    The final design may have to include an index or two on the staging tables to support other operations

  • Truncated, Not Dropped: Tables are reusable infrastructure, truncated after each operation

Tables Created:

-- Core staging tables staging_call_number staging_classification staging_contributor staging_holding staging_instance staging_instance_subject staging_instance_contributor staging_instance_classification staging_instance_call_number staging_item staging_subject -- sample CREATE UNLOGGED TABLE diku_mod_search.staging_holding ( id uuid NOT NULL, tenant_id varchar(100) NOT NULL, instance_id uuid NOT NULL, "json" jsonb NOT NULL, inserted_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL ) PARTITION BY RANGE ("substring"((id)::text, 1, 1));

Benefits:

  • Eliminates contention on main operational tables

  • Supports concurrent writes from multiple processes

  • Provides data isolation during complex merge operations

  • Helps with diffing operations to determine data points that should be cleaned up in main tables and Opensearch index

3. Data Migration

Multiple parallel processes load raw data from inventory into staging tables without validation or deduplication for maximum throughput.
When the merge phase is complete, tenant data is stored in the staging tables. Data should be merged with data in the existing tables. Entity & entity relationship data is migrated by INSERT … ON CONFLICT DO NOTHING queries from staging tables to main tables with ORDER BY inserted_at DESC so that most recent records are processed first.

Postgres Working Memory
The current design does not have indexes on the staging tables for performance reasons. When performing data migration it will be important to have configurable work_mem setting for the migration queries. This will let database operations occur in memory without spilling to disk. Check git patch for reference.

4. Child Resource Management

During migration of staging data, timestamp updates on the main child resource tables occur based on data present in the staging tables so that the upload phase can pick up only fresh data. if a child resource is in the staging table, INSERT … ON CONFLICT DO UPDATE , update last updated date if there is a conflict. Recently updated child resources are indexed during upload phase.

-- Update subjects with new relationships UPDATE schema.subject s SET last_updated_date = CURRENT_TIMESTAMP FROM schema.staging_instance_subject sis WHERE s.id = sis.subject_id
-- Child resource upload with timestamp filtering SELECT * FROM schema.subject s WHERE s.id >= ? AND s.id <= ? AND s.last_updated_date > ? -- mergeStartTime timestamp

Benefits

  • Avoids uploading unchanged child resources for better performance

  • Ensures only affected child resources are reindexed

  • Maintains data consistency while minimizing processing overhead

Stale Child Resources
Since all child resources are not indexed into opensearch, precise update are needed to have consistent data in opensearch. This is mainly child resource relationships that need to be considered. A diff should occur between staging and main child relationship tables to find relationships that are present in staging tables but not main tables. Timestamps on the child resource should be updated so that they can be picked up during the upload phase.

5. Instance Management

In consortium environments, some instances are shared across multiple tenants. During member-specific reindex, these shared instances must be preserved. All local instances and shared instances with holdings from the member tenant are refreshed during the upload phase.

-- Local instances from central tenant SELECT i.id, i.tenant_id, i.shared, i.is_bound_with, i.json FROM instance i WHERE i.tenant_id = 'university_library' UNION ALL -- Shared instances that have holdings for member tenant SELECT i.id, i.tenant_id, i.shared, i.is_bound_with, i.json FROM instance i WHERE i.shared = true AND EXISTS ( SELECT 1 FROM holding h WHERE h.instance_id = i.id AND h.tenant_id = 'university_library' )

The query above would be qualified by ID ranges in the filter.

Stale Holdings/Items in Shared Instances
Since all Shared Instances are not reloaded into opensearch, precise updates need to occur to keep data consistent. There is a case where an holding is no longer associated with an Shared instance. A diff between the main and staging tables for holdings and items should occur so that specific shared instances can be refreshed to remove stale data. The diff will find holdings that are in the main table for a tenant but is not in the staging table.

6. Data Cleanup

Stale child resource relationships are identified so that the child resource is tapped for refresh at a later stage.

-- stale child relationships UPDATE schema.subject s SET last_updated_date = CURRENT_TIMESTAMP FROM ( SELECT main.subject_id FROM schema.instance_subject AS main LEFT JOIN schema.staging_instance_subject AS staging ON main.instance_id = staging.instance_id AND main.subject_id = staging.subject_id AND main.tenant_id = staging.tenant_id WHERE main.tenant_id = 'university_library' AND staging.instance_id IS NULL ) AS stale WHERE s.id = stale.subject_id

Entity data in main tables are removed by tenant id. An index on tenant id should be added to the main tables.

-- For tables that support tenant-specific deletion (instances, holdings, items) DELETE FROM schema_name.instance WHERE tenant_id = ? DELETE FROM schema_name.holding WHERE tenant_id = ? DELETE FROM schema_name.item WHERE tenant_id = ? DELETE FROM schema_name.instance_subject WHERE tenant_id = ? DELETE FROM schema_name.instance_contributor WHERE tenant_id = ? DELETE FROM schema_name.instance_classification WHERE tenant_id = ? DELETE FROM schema_name.instance_call_number WHERE tenant_id = ?

Data is also removed in Opensearch using deleteByQuery API for the tenant in indices for entity data where tenant id is equal to the member tenant being refreshed.

{ "index": "folio_instance_central", "conflicts": "proceed", "query": { "bool": { "must": [{"term": {"tenantId": "university_library"}}], "must_not": [{"term": {"shared": true}}] } }, "scroll_size": 10000, "scroll": "5m", "timeout": "60m", "refresh": true }


Member Tenant Reindex Flow

  1. Reindex request is submitted with tenantId parameter.

  2. Member tenant context stored in database

  3. Standard Kafka events published for one member tenant.

  4. Compare existing vs. fresh data to identify stale child resource.

  5. Clear existing tenant data from main tables

  6. Migrate data from staging tables to main tables.

  7. Delete tenant data from Opensearch

  8. Perform targeted document refreshes in Opensearch for shared instances with holdings belonging to the member tenant and ALL local instances.

  9. Perform target document refreshes in Opensearch for child resources that are recently updated.

Design Decisions and Rationale

1. Staging Tables Over Direct Processing

Decision: Use staging tables as intermediate storage instead of direct operational table processing

Rationale:

  • Eliminates Lock Contention: Operational tables remain available for real-time queries while staging tables handle bulk writes

  • Maximum Write Throughput: UNLOGGED, unindexed staging tables achieve 5-10x faster write performance compared to indexed operational tables

  • Enables Data Validation: Staging tables allow inspection and validation before committing to operational tables

  • Parallel Processing: Partitioned staging tables enable concurrent processing across multiple worker threads

  • Failure Recovery: Staging data preserved on failure for analysis and potential retry capabilities

  • Diffing Capabilities: Enables comparison between staging and main tables for identification of stale data.

Trade-offs:

  • Additional storage overhead during reindex operations

  • Increased complexity in full reindex logic

  • Temporary disk space requirements for staging data

3. Index Preservation for Member Reindex

Decision: Skip index recreation for tenant-specific reindex operations

Rationale:

  • Preserves Shared Documents: Shared consortium instances remain searchable during member tenant operations

  • Faster Operations: Eliminates index rebuild time (can be hours for large datasets)

  • Maintains Service Availability: Other consortium members experience zero downtime

  • Selective Updates: Only affected documents are updated rather than recreating entire indices

  • Resource Efficiency: Avoids unnecessary CPU and I/O overhead for unchanged data

Implementation:

  • Uses targeted document deletion: DELETE WHERE tenant_id = ? AND shared != true

  • Bulk document updates only for affected records

4. Selective Document Cleanup Strategy

Decision: Remove only target tenant documents from OpenSearch while preserving shared instances

Rationale:

  • Consortium Data Integrity: Shared instances critical for cross-tenant functionality remain untouched

  • Referential Integrity: Maintains relationships between shared instances and local holdings

  • Precision: Targets only the data requiring refresh

Technical Implementation:

-- OpenSearch deletion query DELETE FROM index WHERE ( tenant_id = 'university_library' AND shared != true ) OR ( shared = true AND NOT EXISTS ( SELECT 1 FROM holdings WHERE instance_id = document.id AND tenant_id != 'university_library' ) )

5. Timestamp-Based Upload for Child Resources

Decision: Use last_updated_date filtering instead of range-based processing for child resources

Rationale:

  • Selective Processing: Only child resources with new/updated relationships are uploaded

  • Performance: Avoids processing unchanged child resources that don't need reindexing

  • Consistency: Uses merge start time as baseline to capture all affected resources

Implementation Details:

-- Child resource timestamp update during migration UPDATE subject SET last_updated_date = CURRENT_TIMESTAMP FROM staging_instance_subject WHERE subject.id = staging_instance_subject.subject_id -- Upload query with timestamp filter SELECT * FROM subject WHERE id >= ? AND id <= ? AND last_updated_date > ? -- mergeStartTime

7. Conditional Instance Selection for Upload

Decision: Use SQL UNION query to fetch both local and relevant shared instances

Rationale:

  • Data Completeness: Ensures member tenant gets all instances they should have access to

  • Performance: Single query handles both local and shared instance selection

Query Structure:

-- Local instances belonging to member tenant SELECT * FROM instance WHERE tenant_id = 'university_library' UNION ALL -- Shared instances with member tenant holdings SELECT * FROM instance WHERE shared = true AND EXISTS (SELECT 1 FROM holding WHERE tenant_id = 'university_library')

10. Partitioned Table Architecture

Decision: Partition staging tables by UUID first character (0-9, a-f)

Rationale:

  • Parallel Processing: 16 partitions enable concurrent processing across multiple workers

  • I/O Distribution: Increase chance of cache hits since related data is localized in the same pages.

Partition Strategy:

  • Range partitioning: ('0') TO ('1'), ('1') TO ('2'), etc.

  • Uniform distribution based on UUID randomness