Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Jira Legacy
serverSystem JiraJIRA
serverId01505d01-b853-3c2e-90f1-ee9b165564fc
keyMODPUBSUB-11

Goal: 

Create a PoC that demonstrates how to use Vert.x Kafka client inside of folio module to implement PubSub functionality.

Result: 
https://github.com/folio-org/mod-pub-sub-poc

Initialization:

During initialization following actions are performed:1) A

  1.  A separate Kafka topic is created for each of pre-configured event types, if such topic doesn't already exist.
    Topic name has structure "pub-sub.tenantId.eventType" (e.g. pub-sub.diku.order_created)

...

  1. A separate consumer is created and assigned to each Kafka topic.
    All consumers have groupId set to the same value as corresponding topic name, this is done to make sure that consumers of the same topic have the same groupId, this way if there are multiple instances of consumer then event will be processed by only one consumer.

...

...

  1. One producer is created.
    This producer is used when event is published by calling POST /publish endpoint

Endpoints:

POST /publish - publishes a new event to corresponding Kafka topic, based on eventType.

Example of request body: 

Code Block
themeConfluence
{

...


  "value" : "order 123 is created",

...


  "eventType" : "order_created"
}



Required environment variables:
KAFKA_HOST - host of kafka server

KAFKA_PORT - port of kafka server

DeploymentDescriptor.json example:

Code Block
themeConfluence
titleDeploymentDescriptor.json
{
  "srvcId": "mod-pub-sub-poc-1.0.0-SNAPSHOT",
  "nodeId": "10.0.2.15",
  "descriptor": {
    "dockerImage": "test-kafka-my:latest",
    "dockerPull": false,
    "env": [
      {
        "name": "KAFKA_HOST",
        "value": "10.0.2.15"
      },
      {
        "name": "KAFKA_PORT",
        "value": "29092"
      },
      {
        "name": "JAVA_OPTIONS",
        "value": "-Xmx256m"
      }
    ],
    "dockerArgs": {
      "HostConfig": {
        "PortBindings": {
          "8081/tcp": [
            {
              "HostPort": "%p"
            }
          ]
        }
      }
    }
  }
}

Useful links:

https://kafka.apache.org/documentation/ - Kafka documentation

...

https://rmoff.net/2018/08/02/kafka-listeners-explained/ - This post was very useful for configuring Kafka server to be used by module from Docker container.

Code examples:

Code for creating topics:

Code Block
languagejava
themeConfluence
titlebookCollection.jsonCreate topics
  @Bean
  public AdminClient adminClient(@Value("${topic.tenant}") String tenant,
                                 @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                 @Value("${KAFKA_HOST}") String kafkaHost,
                                 @Value("${KAFKA_PORT}") String kafkaPort) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
    AdminClient adminClient = AdminClient.create(configs);

    int numPartitions = 1;
    short replicationFactor = 1;

    List<NewTopic> topics = eventTypes.stream()
      .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor))
      .collect(Collectors.toList());

    adminClient.createTopics(topics);

    return adminClient;
  }

Code for creating consumers with Vert.x Kafka client:

Code Block
languagejava
themeConfluence
titlebookCollection.jsonCreate consumers
@Bean
  public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant,
                                                            @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                                            @Value("${KAFKA_HOST}") String kafkaHost,
                                                            @Value("${KAFKA_PORT}") String kafkaPort,
                                                            LoggingHandler loggingHandler, Vertx vertx) {
    return eventTypes
      .stream().map(eventType ->
      {
        PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType);
        return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId()))
          .subscribe(pubSubConfig.getTopicName())
          .handler(loggingHandler);
      })
      .collect(Collectors.toList());
  }

  private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) {
    Map<String, String> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }

...

Code Block
themeConfluence
titlebookCollection.jsonCreate producer
@Bean
public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost,
                                   @Value("${KAFKA_PORT}") String kafkaPort) {
  Map<String, String> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort));
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props);
}

...