Зафиксировать асинхронно сообщение сразу после прочтения темы

Пытаюсь закоммитить сообщение сразу после прочтения из темы. Я перешел по этой ссылке (https://www.confluent.io/blog/apache-kafka-spring-boot-application), чтобы создать потребителя Kafka с помощью spring. Обычно это работает идеально, и потребитель получает сообщение и ждет, пока другой не войдет в очередь. Но проблема в том, что когда я обрабатываю эти сообщения, это занимает много времени (около 10 минут), очередь кафки думает, что сообщение не потребляется (зафиксировано), и потребители читают его снова и снова. Я должен сказать, что когда время моего процесса составляет менее 5 минут, он работает хорошо, но когда он длится дольше, сообщение не фиксируется.

Я искал некоторые ответы, но это мне не помогает, потому что я не использую тот же исходный код (и, конечно, другую структуру). Я пытался отправить асинхронные методы, а также асинхронно зафиксировать сообщение, но мне это не удалось. Некоторые из источников:

Spring Boot Kafka: фиксация не может быть завершена, так как группа уже перебалансирована

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Потребитель Kafka 0.10 Java не читает сообщение из темы

https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

Основной класс здесь:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


Класс потребителей (где мне нужно зафиксировать свое сообщение)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }


Как я могу зафиксировать сообщение сразу после того, как я прочитал его из очереди.

Я хочу быть уверен, что когда я получу сообщение, я немедленно его зафиксирую. Прямо сейчас сообщение фиксируется, когда я заканчиваю выполнение метода сразу после (System.out.println). Так может ли кто-нибудь сказать мне, как это сделать?

----- Обновить -------

Извините за поздний ответ, но, как предложил @GirishB, я искал конфигурацию GirishB, но я не вижу, где я могу определить тему, которую я хочу читать/прослушивать, из моего файла конфигурации (applications.yml). Во всех примерах, которые я вижу, используется структура, подобная этой (http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). Есть ли возможность прочитать тему, объявленную на другом сервере? Используя что-то похожее на это @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")

=========== РЕШЕНИЕ 1 ==================================== ===

Я последовал совету @victor gallet и включил объявление параметров потребителя, чтобы разместить объект «Подтверждение» в методе потребления. Я также перешел по этой ссылке (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration .Джава), чтобы получить все методы, которые я использовал для объявления и установки всех свойств (consumerProperties, ConsumerFactory, kafkaListenerContainerFactory). Единственная проблема, которую я обнаружил, это «new SeekToCurrentErrorHandler()», потому что я получаю сообщение об ошибке и на данный момент не могу ее устранить (было бы здорово, если бы кто-нибудь объяснил мне это).


@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;


   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }




    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

``````````````````````````````````````````````````````````

если ваше время обработки велико и вы хотите зафиксировать только ПОСЛЕ успешного завершения процесса, рассмотрите возможность настройки этих параметров: max.poll.interval.ms, max.poll.interval.ms и max.poll.records см. kafka.apache.org/документация

Paizo 29.05.2019 11:00
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
1
3 154
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Вы можете использовать java.util.concurrent.BlockingQueue, чтобы отправить сообщение, когда вы потребляете и фиксируете смещение Kafka. Затем, используя другой поток, получите сообщение из blockingQueue и обработайте. Таким образом, вам не нужно ждать завершения обработки.

Ответ принят как подходящий

Вы должны изменить конфигурацию своего потребителя, установив для свойства enable.auto.commit значение false :

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Затем вам нужно изменить фабрику Spring Kafka Listener и установить для режима подтверждения значение MANUAL_IMMEDIATE. Вот пример ConcurrentKafkaListenerContainerFactory :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

Как поясняется в документации, MANUAL_IMMEDIATE означает: фиксировать смещение немедленно, когда прослушиватель вызывает метод Acknowledgment.acknowledge().

Вы можете найти все методы фиксации здесь.

Затем в коде слушателя вы можете зафиксировать смещение вручную, добавив объект Acknowledgment, например:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ложь);

После установки вышеуказанного свойства, если вы хотите обрабатывать в пакетном режиме, вы можете следовать следующим конфигурациям.

     factory.getContainerProperties().setAckMode(AckMode.MANUAL);

// вы можете установить либо Manual, либо MANUAL_IMMEDIATE, потому что //KafkaMessageListenerContainer вызывает //ConsumerBatchAcknowledgment для любого ручного режима подтверждения

    factory.getContainerProperties().setAckOnError(true);
    //specifying batch error handler because i have enabled to listen records in batch
    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
    factory.setBatchListener(true);
    factory.getContainerProperties().setSyncCommits(false);

Другие вопросы по теме