Kafka Automation: Deleting Messages













Overview

While working on the Data Import test using AWS MSK, Kafka messages we getting blocked in Kafka topic and we had to clear out messages from all DI topics. Messages can get stuck in a topic for many reasons such as consumers not having enough resources to consume all topics, the consumer goes down or AWS MSK Broker becomes unstable. Depending on the number of tenants per FOLIO instance, there can be thousands of topics per FOLIO instance. Instead of manually clearing out all messages from each of these topics, we decided to automate it to make this process faster.


Before running the scripts, try the following options first

  • restart all DI modules
  • Wait until log.retention.minutes duration specified in MSK config has passed 

Scripts

To clear out messages from the selected list of Kafka topics, there are 3 steps:

  1. Get the list of topics to clear out messages
    • Download the latest Kafka release and extract it - https://kafka.apache.org/quickstart
    • Navigate to the bin directory kafka_2.12-2.7.0/bin/ - we will be using kafka-topics.sh and kafka-configs.sh scripts out of the box
    • Get list of topics and write to file topics.txt. Every topic will be on a separate line. For example, we want all topics for tenant tenant001
      create-topics-file.sh
      #!/bin/bash -e
      
      source ./kafka-topics.sh --describe --zookeeper <zookeeper-plaintext-connection-url> --topic "imtc.Default.tenant001.*" | grep Configs | awk '{printf "%s\n", $2}' > topics.txt

       2. For all topics in the topics.txt file from step 1, alter the topic by adding custom config to set retention time to 1 second or less. This means after 1 second, all messages in all topics will be deleted.

alter-topic-config.sh
#!/bin/bash -e

awk '{ system("./kafka-configs.sh --alter --bootstrap-server <bootstrap-plaintext-connection-url> --entity-type topics --entity-name="$1" --add-config retention.ms=" 1000) }' topics.txt


Verify config is updated

./kafka-topics.sh --describe --zookeeper <zookeeper-plaintext-connection-url> --topic "imtc.Default.tenant001.DI_COMPLETED"
Topic: imtc.Default.tenant001.DI_COMPLETED PartitionCount: 1 ReplicationFactor: 2 Configs: retention.ms=1000
Topic: imtc.Default.tenant001.DI_COMPLETED Partition: 0 Leader: 1 Replicas: 1,2 Isr: 2,1


       3. Revert all topics by deleting the custom config from step 2

delete-config.sh
#!/bin/bash -e

awk '{ system("./kafka-configs.sh --bootstrap-server <bootstrap-plaintext-connection-url> --entity-type topics --entity-name="$1" --alter --delete-config retention.ms") }' topics.txt

Verify config is deleted

./kafka-topics.sh --describe --zookeeper <zookeeper-plaintext-connection-url> --topic "imtc.Default.tenant001.DI_COMPLETED"
Topic: imtc.Default.tenant001.DI_COMPLETED PartitionCount: 1 ReplicationFactor: 2 Configs:
Topic: imtc.Default.tenant001.DI_COMPLETED Partition: 0 Leader: 1 Replicas: 1,2 Isr: 2,1

Skip to end of metadata


Scripts for recreation topics with different number of partitions

Step1

Delete topics 

./delete_topics <file>

#!/bin/bash -e
val=$1
awk 'BEGIN{print ARGC,ARGV[1]} { system("./kafka-topics.sh --zookeeper z-1.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181,z-2.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181,z-3.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181 --delete --topic "$1"  ") }' "$val" 


example of using


Step 2

Create topics with new parameters (number of partitions - 2)

./create_topics <file>

#!/bin/bash 
val=$1
awk '{ system("./kafka-topics.sh --create --zookeeper z-1.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181,z-2.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181,z-3.kafka280.xwt0mn.c17.kafka.us-east-1.amazonaws.com:2181  --topic "$1" --partitions 2 --replication-factor 1  ") }' "$val"


example