Skip to end of banner
Go to start of banner

Spike: MODPUBSUB-11 Vert.x Kafka client PoC

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

MODPUBSUB-11 - Getting issue details... STATUS

Goal: 

Create a PoC that demonstrates how to use Vert.x Kafka client inside of folio module to implement PubSub functionality.

Result: 
https://github.com/folio-org/mod-pub-sub-poc

Initialization:

During initialization following actions are performed:
1) A separate Kafka topic is created for each of pre-configured event types, if such topic doesn't already exist.

Topic name has structure "pub-sub.tenantId.eventType" (e.g. pub-sub.diku.order_created)

2) A separate consumer is created and assigned to each Kafka topic.

All consumers have groupId set to the same value as corresponding topic name, this is done to make sure that consumers of the same topic have the same groupId, this way if there are multiple instances of consumer then event will be processed by only one consumer. 

3) One producer is created. This producer is used when event is published by calling POST /publish endpoint

Endpoints:

POST /publish - publishes a new event to corresponding Kafka topic, based on eventType.

Example of request body: 
{
"value" : "order 123 is created",
"eventType" : "order_created"
}

Useful links:

https://kafka.apache.org/documentation/ - Kafka documentation

https://vertx.io/docs/vertx-kafka-client/java/ - Vert.x Kafka client documentation

https://rmoff.net/2018/08/02/kafka-listeners-explained/ - This post was very useful for configuring Kafka server to be used by module from Docker container.

Code examples:

Code for creating topics:

bookCollection.json
  @Bean
  public AdminClient adminClient(@Value("${topic.tenant}") String tenant,
                                 @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                 @Value("${KAFKA_HOST}") String kafkaHost,
                                 @Value("${KAFKA_PORT}") String kafkaPort) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
    AdminClient adminClient = AdminClient.create(configs);

    int numPartitions = 1;
    short replicationFactor = 1;

    List<NewTopic> topics = eventTypes.stream()
      .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor))
      .collect(Collectors.toList());

    adminClient.createTopics(topics);

    return adminClient;
  }

Code for creating consumers with Vert.x Kafka client:

bookCollection.json
@Bean
  public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant,
                                                            @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                                            @Value("${KAFKA_HOST}") String kafkaHost,
                                                            @Value("${KAFKA_PORT}") String kafkaPort,
                                                            LoggingHandler loggingHandler, Vertx vertx) {
    return eventTypes
      .stream().map(eventType ->
      {
        PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType);
        return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId()))
          .subscribe(pubSubConfig.getTopicName())
          .handler(loggingHandler);
      })
      .collect(Collectors.toList());
  }

  private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) {
    Map<String, String> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }

Code for creating Vertx Kafka client Producer:

bookCollection.json
@Bean
public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost,
                                   @Value("${KAFKA_PORT}") String kafkaPort) {
  Map<String, String> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort));
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props);
}



  • No labels