Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
Goal:
Create a PoC that demonstrates how to use Vert.x Kafka client inside of folio module to implement PubSub functionality.
Result:
https://github.com/folio-org/mod-pub-sub-poc
Initialization:
During initialization following actions are performed:1) A
- A separate Kafka topic is created for each of pre-configured event types, if such topic doesn't already exist.
Topic name has structure "pub-sub.tenantId.eventType" (e.g. pub-sub.diku.order_created)
...
- A separate consumer is created and assigned to each Kafka topic.
All consumers have groupId set to the same value as corresponding topic name, this is done to make sure that consumers of the same topic have the same groupId, this way if there are multiple instances of consumer then event will be processed by only one consumer.
...
...
- One producer is created.
This producer is used when event is published by calling POST /publish endpoint
Endpoints:
POST /publish - publishes a new event to corresponding Kafka topic, based on eventType.
Example of request body:
Code Block | ||
---|---|---|
| ||
{ |
...
"value" : "order 123 is created", |
...
"eventType" : "order_created" } |
Required environment variables:
KAFKA_HOST - host of kafka server
KAFKA_PORT - port of kafka server
DeploymentDescriptor.json example:
Code Block | ||||
---|---|---|---|---|
| ||||
{ "srvcId": "mod-pub-sub-poc-1.0.0-SNAPSHOT", "nodeId": "10.0.2.15", "descriptor": { "dockerImage": "test-kafka-my:latest", "dockerPull": false, "env": [ { "name": "KAFKA_HOST", "value": "10.0.2.15" }, { "name": "KAFKA_PORT", "value": "29092" }, { "name": "JAVA_OPTIONS", "value": "-Xmx256m" } ], "dockerArgs": { "HostConfig": { "PortBindings": { "8081/tcp": [ { "HostPort": "%p" } ] } } } } } |
Useful links:
https://kafka.apache.org/documentation/ - Kafka documentation
...
https://rmoff.net/2018/08/02/kafka-listeners-explained/ - This post was very useful for configuring Kafka server to be used by module from Docker container.
Code examples:
Code for creating topics:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Bean public AdminClient adminClient(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort); AdminClient adminClient = AdminClient.create(configs); int numPartitions = 1; short replicationFactor = 1; List<NewTopic> topics = eventTypes.stream() .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor)) .collect(Collectors.toList()); adminClient.createTopics(topics); return adminClient; } |
Code for creating consumers with Vert.x Kafka client:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@Bean public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort, LoggingHandler loggingHandler, Vertx vertx) { return eventTypes .stream().map(eventType -> { PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType); return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId())) .subscribe(pubSubConfig.getTopicName()) .handler(loggingHandler); }) .collect(Collectors.toList()); } private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return props; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort)); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props); } |
...