Пытаюсь закоммитить сообщение сразу после прочтения из темы. Я перешел по этой ссылке (https://www.confluent.io/blog/apache-kafka-spring-boot-application), чтобы создать потребителя Kafka с помощью spring. Обычно это работает идеально, и потребитель получает сообщение и ждет, пока другой не войдет в очередь. Но проблема в том, что когда я обрабатываю эти сообщения, это занимает много времени (около 10 минут), очередь кафки думает, что сообщение не потребляется (зафиксировано), и потребители читают его снова и снова. Я должен сказать, что когда время моего процесса составляет менее 5 минут, он работает хорошо, но когда он длится дольше, сообщение не фиксируется.
Я искал некоторые ответы, но это мне не помогает, потому что я не использую тот же исходный код (и, конечно, другую структуру). Я пытался отправить асинхронные методы, а также асинхронно зафиксировать сообщение, но мне это не удалось. Некоторые из источников:
Spring Boot Kafka: фиксация не может быть завершена, так как группа уже перебалансирована
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
Потребитель Kafka 0.10 Java не читает сообщение из темы
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
Основной класс здесь:
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
Класс потребителей (где мне нужно зафиксировать свое сообщение)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
Как я могу зафиксировать сообщение сразу после того, как я прочитал его из очереди.
Я хочу быть уверен, что когда я получу сообщение, я немедленно его зафиксирую. Прямо сейчас сообщение фиксируется, когда я заканчиваю выполнение метода сразу после (System.out.println). Так может ли кто-нибудь сказать мне, как это сделать?
----- Обновить -------
Извините за поздний ответ, но, как предложил @GirishB, я искал конфигурацию GirishB, но я не вижу, где я могу определить тему, которую я хочу читать/прослушивать, из моего файла конфигурации (applications.yml). Во всех примерах, которые я вижу, используется структура, подобная этой (http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). Есть ли возможность прочитать тему, объявленную на другом сервере? Используя что-то похожее на это @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
=========== РЕШЕНИЕ 1 ==================================== ===
Я последовал совету @victor gallet и включил объявление параметров потребителя, чтобы разместить объект «Подтверждение» в методе потребления. Я также перешел по этой ссылке (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration .Джава), чтобы получить все методы, которые я использовал для объявления и установки всех свойств (consumerProperties, ConsumerFactory, kafkaListenerContainerFactory). Единственная проблема, которую я обнаружил, это «new SeekToCurrentErrorHandler()», потому что я получаю сообщение об ошибке и на данный момент не могу ее устранить (было бы здорово, если бы кто-нибудь объяснил мне это).
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````




Вы можете использовать java.util.concurrent.BlockingQueue, чтобы отправить сообщение, когда вы потребляете и фиксируете смещение Kafka. Затем, используя другой поток, получите сообщение из blockingQueue и обработайте. Таким образом, вам не нужно ждать завершения обработки.
Вы должны изменить конфигурацию своего потребителя, установив для свойства enable.auto.commit значение false :
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Затем вам нужно изменить фабрику Spring Kafka Listener и установить для режима подтверждения значение MANUAL_IMMEDIATE. Вот пример ConcurrentKafkaListenerContainerFactory :
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Как поясняется в документации, MANUAL_IMMEDIATE означает: фиксировать смещение немедленно, когда прослушиватель вызывает метод Acknowledgment.acknowledge().
Вы можете найти все методы фиксации здесь.
Затем в коде слушателя вы можете зафиксировать смещение вручную, добавив объект Acknowledgment, например:
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
// commit immediately
acknowledgment.acknowledge();
}
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ложь);
После установки вышеуказанного свойства, если вы хотите обрабатывать в пакетном режиме, вы можете следовать следующим конфигурациям.
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// вы можете установить либо Manual, либо MANUAL_IMMEDIATE, потому что //KafkaMessageListenerContainer вызывает //ConsumerBatchAcknowledgment для любого ручного режима подтверждения
factory.getContainerProperties().setAckOnError(true);
//specifying batch error handler because i have enabled to listen records in batch
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
если ваше время обработки велико и вы хотите зафиксировать только ПОСЛЕ успешного завершения процесса, рассмотрите возможность настройки этих параметров:
max.poll.interval.ms,max.poll.interval.msиmax.poll.recordsсм. kafka.apache.org/документация