Data Import Test Report (Kiwi) +DI topics with partitions > 1

Data Import Test Report (Kiwi) +DI topics with partitions > 1

Overview

This document contains the results of testing Data Import in Kiwi with OL enabled + increase number of partition in Kafka MQ Topics.



Infrastructure 

  • 6 m5.xlarge EC2 instances 

  • 2 instances of db.r6.xlarge database instances, one reader and one writer

  • MSK

    • 4 m5.2xlarge brokers in 2 zones

    • auto.create-topics.enable = true

    • log.retention.minutes=120

  • mod-inventory memory

    • 256 CPU units, 1814MB mem

    • inventory.kafka.DataImportConsumerVerticle.instancesNumber=10

    • inventory.kafka.MarcBibInstanceHridSetConsumerVerticle.instancesNumber=10

    • kafka.consumer.max.poll.records=10

  • mod-inventory-storage

    • 128 CPU units, 544MB mem

  • mod-source-record-storage

    • 128 CPU units, 908MB mem

  • mod-source-record-manager

    • 128 CPU units, 1292MB mem

  • mod-data-import

    • 128 CPU units, 1024MB mem



Software versions

  • mod-data-import:2.2.0

  • mod-inventory:18.0.4

  • mod-inventory-storage:22.0.2-optimistic-locking.559

  • mod-source-record-storage:5.2.5

  • mod-source-record-manager:3.2.6



Results

Tests performed:



KIWI

KIWI (with OL)

KIWI 

with

partitions N# 2

KIWI 

with

partitions N# 4



KIWI

KIWI (with OL)

KIWI 

with

partitions N# 2

KIWI 

with

partitions N# 4

5K MARC Create

PTF - Create 2

5 min, 8 min

8 min

5 min

5,7 min

5K MARC Update

PTF - Updates Success - 1

11 min, 13 min

6 min

7,6 min

6 min

10K MARC Create 

PTF - Create 2

11 min , 14 min

12 min

10,12 min

16 min

10K MARC Update

PTF - Updates Success - 1

22 min, 24 min

15 min

11 min

failed

25K MARC Create

PTF - Create 2

23 mins, 25 mins, 26 mins

24 min

23,26 min

25 min

25K MARC Update

PTF - Updates Success - 1

1 hour 20 mins (completed with errors) *, 56 mins

40 min

failed

failed

50K MARC Create

PTF - Create 2

Completed with errors, 1 hr 40 mins

43 min

failed

failed

50K Update

PTF - Updates Success - 1

2 hr 32 mins (job stuck at 76% completion)

1hr 4min

failed

failed

With an increase in the number of partitions, there is no noticeable change in the performance of the service, however, negative trends were observed - an increase in the number of errors, more often the data import procedures fell.

number of partitions - 2

this table shows the results of a group of sequential data import tests for 2 partitions. In the case of errors, the number of missing entities in the database was determined

start time

end time

#instances from DB

start time

end time

#instances from DB

CREATE 5,000 recordsBegan 8:20 AM 

1/28/2022, 8:26 AM



CREATE10,000 recordsBegan 8:48 AM

1/28/2022, 9:00 AM



update 10,000 recordsBegan 9:17 AM

FAILED



CREATE 25,000 recordsBegan 9:37 AM

Completed with errors

1/28/2022, 10:04 AM

fs09000000_mod_inventory_storage.item - 24996

fs09000000_mod_inventory_storage.holdings_record - 24996

fs09000000_mod_inventory_storage.instance - 25000

fs09000000_mod_source_record_storage.records_lb - 25000

 restart mods and clean Kafka MQ

create 25,000 recordsBegan 10:31 AM today

1/28/2022, 10:57 AM



restart mods and clean Kafka MQ

50,000 recordsBegan 11:18 AM

Completed with errors

1/28/2022, 12:43 PM

fs09000000_mod_inventory_storage.item  49284

fs09000000_mod_inventory_storage.holdings_record 48684

fs09000000_mod_source_record_storage.records_lb 50000

fs09000000_mod_inventory_storage.instance 50000



 In terms of dynamic characteristics - CPU load, memory - no changes compared to 1 partition, fails are caused by features and possibly bugs in processing Kafka MQ by data import modules: Inventory, source-record-storage



memory usage



Example of error from the logs for the "CREATE 25,000 "

filtered by mod-source-record-manager + error



Field

Value

Field

Value

@ingestionTime

1643361849833

@log

054267740449:kcp1-folio-eis

@logStream

kcp1/mod-source-record-manager/d7b8533172d04fbeace7953e139e48eb

@message

09:24:06.835 [vert.x-worker-thread-18] ERROR PostgresClient [4721195eqId] Timeout

@timestamp

1643361846844

09:24:06.835 [vert.x-worker-thread-18] ERROR PostgresClient [4721195eqId] Timeout

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.844 [vert.x-worker-thread-18] ERROR utionProgressDaoImpl [4721204eqId] Rollback transaction. Failed to update jobExecutionProgress for jobExecution with id 'a3244651-2474-47bb-8d69-54520570bedd

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.845 [vert.x-worker-thread-18] ERROR tHandlingServiceImpl [4721205eqId] Failed to handle DI_COMPLETED event

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.846 [vert.x-eventloop-thread-1] ERROR JournalRecordDaoImpl [4721206eqId] Error saving JournalRecord entity

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.969 [vert.x-worker-thread-11] ERROR PostgresClient [4721329eqId] Timeout

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.969 [vert.x-worker-thread-11] ERROR utionProgressDaoImpl [4721329eqId] Rollback transaction. Failed to update jobExecutionProgress for jobExecution with id 'a3244651-2474-47bb-8d69-54520570bedd

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.970 [vert.x-worker-thread-11] ERROR tHandlingServiceImpl [4721330eqId] Failed to handle DI_COMPLETED event

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.970 [vert.x-eventloop-thread-1] ERROR JournalRecordDaoImpl [4721330eqId] Error saving JournalRecord entity

io.vertx.core.impl.NoStackTraceThrowable: Timeout

09:24:06.991 [vert.x-worker-thread-9] DEBUG KafkaConsumerWrapper [4721351eqId] Consumer - id: 20 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=kcp1\.Default\.\w{1,}\.DI_COMPLETED) a Record has been received. key: 29 currentLoad: 1 globalLoad: 549

09:24:06.991 [vert.x-worker-thread-8] DEBUG KafkaConsumerWrapper [4721351eqId] Threshold is exceeded, preparing to pause, globalLoad: 550, currentLoad: 264, requestNo: -8728

09:24:06.991 [vert.x-worker-thread-8] DEBUG KafkaConsumerWrapper [4721351eqId] Consumer - id: 65 subscriptionPattern: SubscriptionDefinition(eventType=DI_COMPLETED, subscriptionPattern=kcp1\.Default\.\w{1,}\.DI_COMPLETED) a Record has been received. key: 29 currentLoad: 264 globalLoad: 550

09:24:06.993 [vert.x-worker-thread-8] INFO taImportKafkaHandler [4721353eqId] Event was received with recordId: 30004133-ba4d-468c-8dab-77c9b0357b07 event type: DI_COMPLETED

09:24:06.994 [vert.x-eventloop-thread-1] ERROR JournalRecordDaoImpl [4721354eqId] Error saving JournalRecord entity

filtered by mod-source-record-manager + warning

15:27:02.687 [vertx-blocked-thread-checker] WARN ? [15001392eqId] Thread Thread[vert.x-worker-thread-2,5,main] has been blocked for 588042 ms, time limit is 60000 ms

io.vertx.core.VertxException: Thread blocked

at jdk.internal.misc.Unsafe.park(Native Method) ~[?:?]

at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) ~[?:?]

at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1079) ~[?:?]

at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1369) ~[?:?]

at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:278) ~[?:?]

at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:86) ~[ms.jar:?]

at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76) ~[ms.jar:?]

at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) ~[ms.jar:?]

at io.kcache.KafkaCache.put(KafkaCache.java:511) ~[ms.jar:?]

at io.kcache.KafkaCache.remove(KafkaCache.java:548) ~[ms.jar:?]

at org.folio.kafka.cache.KafkaInternalCache.lambda$cleanupEvents$1(KafkaInternalCache.java:151) ~[ms.jar:?]

at org.folio.kafka.cache.KafkaInternalCache$$Lambda$917/0x00000008406d3840.accept(Unknown Source) ~[?:?]

at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]

at org.folio.kafka.cache.KafkaInternalCache.cleanupEvents(KafkaInternalCache.java:151) ~[ms.jar:?]

at org.folio.kafka.cache.util.CacheUtil.lambda$initCacheCleanupPeriodicTask$0(CacheUtil.java:24) ~[ms.jar:?]

at org.folio.kafka.cache.util.CacheUtil$$Lambda$915/0x0000000840839040.handle(Unknown Source) ~[?:?]

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:160) ~[ms.jar:?]

at io.vertx.core.impl.ContextImpl$$Lambda$898/0x0000000840822c40.handle(Unknown Source) ~[?:?]

at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:96) ~[ms.jar:?]

at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:158) ~[ms.jar:?]

at io.vertx.core.impl.ContextImpl$$Lambda$897/0x0000000840822840.run(Unknown Source) ~[?:?]

at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) ~[ms.jar:?]

at io.vertx.core.impl.TaskQueue$$Lambda$145/0x0000000840202840.run(Unknown Source) ~[?:?]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]

at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[ms.jar:?]

at java.lang.Thread.run(Thread.java:829) ~[?:?]

15:27:24.879 [vert.x-eventloop-thread-1] INFO RestRouting [15023584eqId] invoking getAdminHealth

15:27:24.880 [vert.x-eventloop-thread-1] INFO LogUtil [15023585eqId] 127.0.0.1:46174 GET /admin/health null HTTP_1_1 200 4 0 tid=null OK

15:27:26.344 [vert.x-eventloop-thread-1] INFO RestRouting [15025049eqId] invoking getAdminHealth

15:27:26.345 [vert.x-eventloop-thread-1] INFO LogUtil [15025050eqId] 127.0.0.1:46188 GET /admin/health null HTTP_1_1 200 4 0 tid=null OK



errors filtered by mod-inventory

There are older events to load. Load more.

09:31:48 [] [] [] [] ERROR Conn
{ "message": "Cannot update record f47bb037-d48d-4710-99bc-ae0a89161a92 because it has been changed (optimistic locking): Stored _version is 2, _version of request is 1", "severity": "ERROR", "code": "23F09", "where": "PL/pgSQL function holdings_record_set_ol_version() line 8 at RAISE", "file": "pl_exec.c", "line": "3876", "routine": "exec_stmt_raise", "schema": "fs09000000_mod_inventory_storage", "table": "holdings_record" }

io.vertx.pgclient.PgException:
{ "message": "Cannot update record f47bb037-d48d-4710-99bc-ae0a89161a92 because it has been changed (optimistic locking): Stored _version is 2, _version of request is 1", "severity": "ERROR", "code": "23F09", "where": "PL/pgSQL function holdings_record_set_ol_version() line 8 at RAISE", "file": "pl_exec.c", "line": "3876", "routine": "exec_stmt_raise", "schema": "fs09000000_mod_inventory_storage", "table": "holdings_record" }

at io.vertx.pgclient.impl.codec.ErrorResponse.toException(ErrorResponse.java:31) ~[ms.jar:?]

at io.vertx.pgclient.impl.codec.QueryCommandBaseCodec.handleErrorResponse(QueryCommandBaseCodec.java:57) ~[ms.jar:?]

at io.vertx.pgclient.impl.codec.ExtendedQueryCommandCodec.handleErrorResponse(ExtendedQueryCommandCodec.java:90) ~[ms.jar:?]

at io.vertx.pgclient.impl.codec.PgDecoder.decodeError(PgDecoder.java:246) ~[ms.jar:?]

at io.vertx.pgclient.impl.codec.PgDecoder.decodeMessage(PgDecoder.java:132) [ms.jar:?]

at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:112) [ms.jar:?]

at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [ms.jar:?]

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [ms.jar:?]

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [ms.jar:?]

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [ms.jar:?]

at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [ms.jar:?]

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [ms.jar:?]

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [ms.jar:?]

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [ms.jar:?]

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [ms.jar:?]

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [ms.jar:?]

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [ms.jar:?]

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [ms.jar:?]

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [ms.jar:?]

at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [ms.jar:?]

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [ms.jar:?]

at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [ms.jar:?]

at java.lang.Thread.run(Thread.java:829) [?:?]

Example of error from the logs for the "CREATE 50,000 "

filtered by mod-source-record-manager

12:41:35.095 [vert.x-worker-thread-2] ERROR PostgresClient [5063482eqId] Timeout

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.104 [vert.x-worker-thread-2] ERROR utionProgressDaoImpl [5063491eqId] Rollback transaction. Failed to update jobExecutionProgress for jobExecution with id '229ad3be-6211-4f39-8b47-7dcb150ee763

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.105 [vert.x-worker-thread-2] ERROR tHandlingServiceImpl [5063492eqId] Failed to handle DI_COMPLETED event

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.105 [vert.x-worker-thread-2] ERROR PostgresClient [5063492eqId] Timeout

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.106 [vert.x-worker-thread-2] ERROR utionProgressDaoImpl [5063493eqId] Rollback transaction. Failed to update jobExecutionProgress for jobExecution with id '229ad3be-6211-4f39-8b47-7dcb150ee763

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.105 [vert.x-eventloop-thread-1] ERROR JournalRecordDaoImpl [5063492eqId] Error saving JournalRecord entity

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.106 [vert.x-worker-thread-2] ERROR tHandlingServiceImpl [5063493eqId] Failed to handle DI_COMPLETED event

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.108 [vert.x-worker-thread-19] ERROR PostgresClient [5063495eqId] Timeout

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.108 [vert.x-worker-thread-19] ERROR utionProgressDaoImpl [5063495eqId] Rollback transaction. Failed to update jobExecutionProgress for jobExecution with id '229ad3be-6211-4f39-8b47-7dcb150ee763

io.vertx.core.impl.NoStackTraceThrowable: Timeout

12:41:35.108 [vert.x-worker-thread-19] ERROR tHandlingServiceImpl [5063495eqId] Failed to handle DI_COMPLETED event

io.vertx.core.impl.NoStackTraceThrowable: Timeout

filtered by mod-inventory + errors

11:53:59 [] [] [] [] ERROR KafkaConsumerWrapper Consumer - id: 0 subscriptionPattern: SubscriptionDefinition(eventType=DI_SRS_MARC_BIB_INSTANCE_HRID_SET, subscriptionPattern=kcp1\.Default\.\w{1,}\.DI_SRS_MARC_BIB_INSTANCE_HRID_SET) Error while commit offset: 49446

org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

11:53:59 [] [] [] [] INFO eHridSetKafkaHandler Event payload has been received with event type: DI_SRS_MARC_BIB_INSTANCE_HRID_SET and recordId: a36d571a-a64a-4aa7-ab5e-ac78ade3a1ab

11:54:05 [] [] [] [] INFO tbeatResponseHandler [Consumer clientId=consumer-DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4-29, groupId=DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4] Attempt to heartbeat failed since group is rebalancing

11:54:05 [] [] [] [] INFO tbeatResponseHandler [Consumer clientId=consumer-DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4-50, groupId=DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4] Attempt to heartbeat failed since group is rebalancing

11:54:05 [] [] [] [] INFO ConsumerCoordinator [Consumer clientId=consumer-DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4-50, groupId=DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4] Revoke previously assigned partitions

11:54:05 [] [] [] [] INFO tbeatResponseHandler [Consumer clientId=consumer-DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4-6, groupId=DI_SRS_MARC_BIB_RECORD_MATCHED.mod-inventory-18.0.4] Attempt to heartbeat failed since group is rebalancing