Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeConfluence
titlebookCollection.jsonCreate topics
  @Bean
  public AdminClient adminClient(@Value("${topic.tenant}") String tenant,
                                 @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                 @Value("${KAFKA_HOST}") String kafkaHost,
                                 @Value("${KAFKA_PORT}") String kafkaPort) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
    AdminClient adminClient = AdminClient.create(configs);

    int numPartitions = 1;
    short replicationFactor = 1;

    List<NewTopic> topics = eventTypes.stream()
      .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor))
      .collect(Collectors.toList());

    adminClient.createTopics(topics);

    return adminClient;
  }

...

Code Block
themeConfluence
titlebookCollection.jsonCreate consumers
@Bean
  public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant,
                                                            @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes,
                                                            @Value("${KAFKA_HOST}") String kafkaHost,
                                                            @Value("${KAFKA_PORT}") String kafkaPort,
                                                            LoggingHandler loggingHandler, Vertx vertx) {
    return eventTypes
      .stream().map(eventType ->
      {
        PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType);
        return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId()))
          .subscribe(pubSubConfig.getTopicName())
          .handler(loggingHandler);
      })
      .collect(Collectors.toList());
  }

  private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) {
    Map<String, String> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }

...

Code Block
themeConfluence
titlebookCollection.jsonProducer
@Bean
public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost,
                                   @Value("${KAFKA_PORT}") String kafkaPort) {
  Map<String, String> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort));
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props);
}

...