- MODPUBSUB-11Getting issue details... STATUS
Goal:
Create a PoC that demonstrates how to use Vert.x Kafka client inside of folio module to implement PubSub functionality.
Result:
https://github.com/folio-org/mod-pub-sub-poc
Initialization:
During initialization following actions are performed:
- A separate Kafka topic is created for each of pre-configured event types, if such topic doesn't already exist.
Topic name has structure "pub-sub.tenantId.eventType" (e.g. pub-sub.diku.order_created) - A separate consumer is created and assigned to each Kafka topic.
All consumers have groupId set to the same value as corresponding topic name, this is done to make sure that consumers of the same topic have the same groupId, this way if there are multiple instances of consumer then event will be processed by only one consumer. - One producer is created.
This producer is used when event is published by calling POST /publish endpoint
Endpoints:
POST /publish - publishes a new event to corresponding Kafka topic, based on eventType.
Example of request body:
{
"value" : "order 123 is created",
"eventType" : "order_created"
}
Required environment variables:
KAFKA_HOST - host of kafka server
KAFKA_PORT - port of kafka server
DeploymentDescriptor.json example:
{ "srvcId": "mod-pub-sub-poc-1.0.0-SNAPSHOT", "nodeId": "10.0.2.15", "descriptor": { "dockerImage": "test-kafka-my:latest", "dockerPull": false, "env": [ { "name": "KAFKA_HOST", "value": "10.0.2.15" }, { "name": "KAFKA_PORT", "value": "29092" }, { "name": "JAVA_OPTIONS", "value": "-Xmx256m" } ], "dockerArgs": { "HostConfig": { "PortBindings": { "8081/tcp": [ { "HostPort": "%p" } ] } } } } }
Useful links:
https://kafka.apache.org/documentation/ - Kafka documentation
https://vertx.io/docs/vertx-kafka-client/java/ - Vert.x Kafka client documentation
https://rmoff.net/2018/08/02/kafka-listeners-explained/ - This post was very useful for configuring Kafka server to be used by module from Docker container.
Code examples:
Code for creating topics:
@Bean public AdminClient adminClient(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort); AdminClient adminClient = AdminClient.create(configs); int numPartitions = 1; short replicationFactor = 1; List<NewTopic> topics = eventTypes.stream() .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor)) .collect(Collectors.toList()); adminClient.createTopics(topics); return adminClient; }
Code for creating consumers with Vert.x Kafka client:
@Bean public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort, LoggingHandler loggingHandler, Vertx vertx) { return eventTypes .stream().map(eventType -> { PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType); return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId())) .subscribe(pubSubConfig.getTopicName()) .handler(loggingHandler); }) .collect(Collectors.toList()); } private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return props; }
Code for creating Vertx Kafka client Producer:
@Bean public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort)); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props); }