Spike: MODPUBSUB-11 Vert.x Kafka client PoC - draft

MODPUBSUB-11 - Getting issue details... STATUS

Goal: 

Create a PoC that demonstrates how to use Vert.x Kafka client inside of folio module to implement PubSub functionality.

Result: 
https://github.com/folio-org/mod-pub-sub-poc

Initialization:

During initialization following actions are performed:

  1.  A separate Kafka topic is created for each of pre-configured event types, if such topic doesn't already exist.
    Topic name has structure "pub-sub.tenantId.eventType" (e.g. pub-sub.diku.order_created)
  2. A separate consumer is created and assigned to each Kafka topic.
    All consumers have groupId set to the same value as corresponding topic name, this is done to make sure that consumers of the same topic have the same groupId, this way if there are multiple instances of consumer then event will be processed by only one consumer.
  3. One producer is created.
    This producer is used when event is published by calling POST /publish endpoint

Endpoints:

POST /publish - publishes a new event to corresponding Kafka topic, based on eventType.

Example of request body: 

{
  "value" : "order 123 is created",
  "eventType" : "order_created"
}



Required environment variables:
KAFKA_HOST - host of kafka server

KAFKA_PORT - port of kafka server

DeploymentDescriptor.json example:

DeploymentDescriptor.json
{
  "srvcId": "mod-pub-sub-poc-1.0.0-SNAPSHOT",
  "nodeId": "10.0.2.15",
  "descriptor": {
    "dockerImage": "test-kafka-my:latest",
    "dockerPull": false,
    "env": [
      {
        "name": "KAFKA_HOST",
        "value": "10.0.2.15"
      },
      {
        "name": "KAFKA_PORT",
        "value": "29092"
      },
      {
        "name": "JAVA_OPTIONS",
        "value": "-Xmx256m"
      }
    ],
    "dockerArgs": {
      "HostConfig": {
        "PortBindings": {
          "8081/tcp": [
            {
              "HostPort": "%p"
            }
          ]
        }
      }
    }
  }
}

Useful links:

https://kafka.apache.org/documentation/ - Kafka documentation

https://vertx.io/docs/vertx-kafka-client/java/ - Vert.x Kafka client documentation

https://rmoff.net/2018/08/02/kafka-listeners-explained/ - This post was very useful for configuring Kafka server to be used by module from Docker container.

Code examples:

Code for creating topics:

Create topics
  @Bean
  public AdminClient adminClient(@Value("${topic.tenant}") String tenant,
                                 @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                 @Value("${KAFKA_HOST}") String kafkaHost,
                                 @Value("${KAFKA_PORT}") String kafkaPort) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
    AdminClient adminClient = AdminClient.create(configs);

    int numPartitions = 1;
    short replicationFactor = 1;

    List<NewTopic> topics = eventTypes.stream()
      .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor))
      .collect(Collectors.toList());

    adminClient.createTopics(topics);

    return adminClient;
  }

Code for creating consumers with Vert.x Kafka client:

Create consumers
@Bean
  public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant,
                                                            @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                                            @Value("${KAFKA_HOST}") String kafkaHost,
                                                            @Value("${KAFKA_PORT}") String kafkaPort,
                                                            LoggingHandler loggingHandler, Vertx vertx) {
    return eventTypes
      .stream().map(eventType ->
      {
        PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType);
        return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId()))
          .subscribe(pubSubConfig.getTopicName())
          .handler(loggingHandler);
      })
      .collect(Collectors.toList());
  }

  private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) {
    Map<String, String> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }

Code for creating Vertx Kafka client Producer:

Create producer
@Bean
public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost,
                                   @Value("${KAFKA_PORT}") String kafkaPort) {
  Map<String, String> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort));
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props);
}