Я использую конфлюентный параллельный потребитель , чтобы добиться быстрой записи в разные хранилища данных. Я реализовал свой код, и все отлично работало локально с докерами. Как только я запустил несколько хостов с несколькими потребителями (с одним и тем же идентификатором группы), я заметил, что только один из узлов (процессов) действительно потребляет данные. В теме, которую я читаю, 24 раздела, а у меня 3 разных узла, я ожидал, что кафка разделит работу между ними.
Вот части моего кода:
fun buildConsumer(config: KafkaConsumerConfig): KafkaConsumer<String, JsonObject> {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafkaBootstrapServers
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.GROUP_ID_CONFIG] = "myGroup"
// Auto commit must be false in parallel consumer
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonObjectDeSerializer::class.java.name
val consumer = KafkaConsumer<String, JsonObject>(props)
return consumer
}
private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
val options = ParallelConsumerOptions.builder<String, JsonObject>()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.maxConcurrency(10)
.batchSize(1)
.consumer(buildConsumer(kafkaConsumerConfig))
.build()
return ReactorProcessor(options)
}
И мой основной код:
pConsumer = createReactParallelConsumer()
pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
pConsumer.react { context ->
batchProcessor.processBatch(context)
}
Был бы признателен за любой совет
Мы столкнулись с проблемой, которая была закрыта в версии 0.5.2.4 https://github.com/confluentinc/parallel-consumer/issues/409
Параллельный клиент хранил старые незавершенные смещения, так как наш потребитель был медленным (много разных причин), мы дошли до конца хранения (самая ранняя стратегия), поэтому каждый раз, когда мы перезапускали потребителя, он сканировал все эти несовместимые смещения (что он и делал). не усекать их - АКА ошибка). Фикс просто обновлял версию с 0.5.2.3 до 0.5.2.4.