...
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean public AdminClient adminClient(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort); AdminClient adminClient = AdminClient.create(configs); int numPartitions = 1; short replicationFactor = 1; List<NewTopic> topics = eventTypes.stream() .map(eventType -> new NewTopic(new PubSubConsumerConfig(tenant, eventType).getTopicName(), numPartitions, replicationFactor)) .collect(Collectors.toList()); adminClient.createTopics(topics); return adminClient; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean public List<KafkaConsumer<String, String>> kafkaConsumers(@Value("${topic.tenant}") String tenant, @Value("#{'${topic.types}'.split(',')}") List<String> eventTypes, @Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort, LoggingHandler loggingHandler, Vertx vertx) { return eventTypes .stream().map(eventType -> { PubSubConsumerConfig pubSubConfig = new PubSubConsumerConfig(tenant, eventType); return KafkaConsumer.<String, String>create(vertx, consumerConfig(getUrl(kafkaHost, kafkaPort), pubSubConfig.getGroupId())) .subscribe(pubSubConfig.getTopicName()) .handler(loggingHandler); }) .collect(Collectors.toList()); } private Map<String, String> consumerConfig(String bootstrapServerUrl, String groupId) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return props; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean public KafkaProducer kafkaProducer(@Value("${KAFKA_HOST}") String kafkaHost, @Value("${KAFKA_PORT}") String kafkaPort) { Map<String, String> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getUrl(kafkaHost, kafkaPort)); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return KafkaProducer.createShared(Vertx.vertx(), "pub-sub-producer", props); } |
...