Jira Legacy |
---|
server | System JiraJIRA |
---|
serverId | 01505d01-b853-3c2e-90f1-ee9b165564fc |
---|
key | MODPUBSUB-11 |
---|
|
...
Example of request body: {
Code Block |
---|
|
{
"value" : "order 123 is created", |
...
"eventType" : "order_created" |
...
Required environment variables:
KAFKA_HOST - host of kafka server
...
Code for creating topics:
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
title | Create topics |
---|
|
@Bean
public AdminClient adminClient(@Value("${topic.tenant}") String tenant,
@Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
@Value("${KAFKA_HOST}") String kafkaHost,
@Value("${KAFKA_PORT}") String kafkaPort) {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
AdminClient adminClient = AdminClient.create(configs);
int numPartitions = 1;
short replicationFactor = 1;
List<NewTopic> topics = eventTypes.stream()
.map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor))
.collect(Collectors.toList());
adminClient.createTopics(topics);
return adminClient;
}
|
Code for creating consumers with Vert.x Kafka client:
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
title | Create consumers |
---|
|
@Bean
public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant,
@Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
@Value("${KAFKA_HOST}") String kafkaHost,
@Value("${KAFKA_PORT}") String kafkaPort,
LoggingHandler loggingHandler, Vertx vertx) {
return eventTypes
.stream().map(eventType ->
{
PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType);
return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId()))
.subscribe(pubSubConfig.getTopicName())
.handler(loggingHandler);
})
.collect(Collectors.toList());
}
private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) {
Map<String, String> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
|
...