Я новичок в Kafka и, честно говоря, понятия не имею об этом типе потребителей (насколько я понял, это похоже на то, что должно быть готово), поэтому я изо всех сил пытаюсь понять, как в основном потреблять список этих событий.
У меня есть что-то вроде этого:
@KafkaListener(topics = "#{'${kafka.listener.list-of-topics}'.split(',')}")
public void readMessage(List<ConsumerRecord<String, String>> records,
final Acknowledgment acknowledgment) {
try {
....
Я знаю, что когда я получаю событие (по крайней мере, одно), оно имеет тип «MyObject», поэтому я могу сделать это нормально, когда получаю одно сообщение.
Я считаю, что должен быть способ прочитать/прочитать этот List<ConsumerRecords<String,String>, но я не могу понять, как это сделать.
Любые идеи?
См. справочное руководство: Пакетные прослушиватели.
Starting with version 1.1, @KafkaListener methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener property:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
...
You can also receive a list of
ConsumerRecord<?, ?>objects but it must be the only parameter (aside from optionalAcknowledgment, when using manual commits, and/orConsumer<?, ?>parameters) defined on the method:...
При использовании Spring Boot установите свойство spring.kafka.listener.type=batch.