Как потреблять сообщения одно за другим из темы кафки

Я создал одну тему кафки с одним разделом.

kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1

Можно было бы запихнуть много сообщений в эту тему.

Но я бы хотел, чтобы один потребитель (для данной группы) обрабатывал эти сообщения одно за другим.

spring:
  application:
    name: file-consumer
  cloud:
    stream:
      kafka:
        binder:
          type: kafka
          brokers: localhost
          defaultBrokerPort: 29092
          defaultZkPort: 32181
          configuration:
            max.request.size: 300000
            max.message.bytes: 300000
        bindings:
          fileWriteBindingInput:
            consumer:
              autoCommitOffset: false
      bindings:
        fileWriteBindingInput:
          binder: kafka
          destination: files.write
          group: ${spring.application.name}
          contentType: 'text/plain'

И пример кода Java

@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

    // I Would like here to synchronize the processing of messages one by one
    // But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message

    acknowledgment.acknowledge();
}

Чего не хватает в моей конфигурации?

Я думал, пока сообщение не подтверждено (смещение не увеличено), никакое другое сообщение из того же раздела не потребляется.

Эта ссылка может быть вам полезна : github.com/spring-cloud/spring-cloud-stream/issues/575

Nishu Tayal 30.01.2019 16:18
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
1
1
3 165
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Если autoCommitOffset включен (по умолчанию), то подшивка уже будет подтверждать каждую запись. Так что к тому времени, когда он попадет на ваш StreamListener, запись уже будет подтверждена.

Исправление: приведенное выше утверждение о StreamListener не совсем верно. Автоматическое подтверждение выполняется, когда слушатель выходит.

Поскольку у вас есть только один раздел, вы будете получать сообщения в том же порядке, в котором они были отправлены в этот тематический раздел. Вы можете отключить autoCommitOffset, и в этом случае вы можете использовать ручное подтверждение.

Спасибо. Но на самом деле это уже то, что я сделал. Я отключил autoCommitOffset. Я думал, что сообщение больше не потребляется, пока подтверждение не выполнено.

OlivierTerrien 30.01.2019 15:55

Ох, хорошо. Я не заметил этого в вашей конфигурации.

sobychacko 30.01.2019 16:01
>So by the time, it gets to your StreamListener, the record is already acknowledged. — не совсем так; автоматическое подтверждение выполняется, когда слушатель выходит - не имеет отношения к вопросу, но я хотел это прояснить.
Gary Russell 30.01.2019 17:51

Спасибо, @GaryRussell. Обновил ответ.

sobychacko 30.01.2019 19:48
Ответ принят как подходящий

Вы можете установить для этой потребительской конфигурации max.poll.records значение 1, по умолчанию это 500.

макс.опрос.записей

The maximum number of records returned in a single call to poll().

Спасибо. Но я не вижу никакого результата. Возможно конфигурация не удачная. Я установил «max.pool.records» здесь: spring.cloud.stream.kafka.binder.bindings.fileWriteBindingIn‌​put.consumer.configu‌ration.max.poll.reco‌​rds: 1

OlivierTerrien 30.01.2019 17:59
I don't see any result что вы хотите этим сказать? это не сработало? у вас больше записей больше 1? @OlivierTerrien
Deadpool 30.01.2019 18:02

да. Я все еще потребляю много сообщений одновременно. Но, возможно, моя конфигурация не очень хороша.

OlivierTerrien 30.01.2019 18:03

удалите этот max.request.size и max.message.bytes, я попробую, а также обратитесь к ответу @gray russell.

Deadpool 30.01.2019 18:05

Я проверил самоанализом, и Fetcher все еще настроен на 500. То, что я настроил, не очень хорошо. не знаю где и как настроить max.poll.records

OlivierTerrien 30.01.2019 18:09

попробуйте это свойство в блоке configuration: @OlivierTerrien

Deadpool 30.01.2019 18:12

Я использовал ackEachRecord в дополнение к configuration.max.poll.records, и теперь я вижу, что MaxPollRecords Fetcher установлен на 1. Однако поведение не соответствует ожидаемому. Но я думаю, что я не так далеко.

OlivierTerrien 30.01.2019 18:15

тогда я понятия не имею @OlivierTerrien

Deadpool 30.01.2019 18:19

Еще раз спасибо. Я буду продолжать исследовать этот путь.

OlivierTerrien 30.01.2019 19:11

Отсутствие подтверждения сообщения не имеет ничего общего с остановкой доставки следующего сообщения.

Вы не можете передать сообщение другому потоку и подтвердить его позже; если вам нужна однопоточная обработка, вы должны выполнять всю обработку в потоке прослушивателя.

Привет Гэри. Возможно, метод ack не является хорошим решением. Даже настроив max.poll.records для потребления сообщений одно за другим, я могу в журналах использовать сообщения одновременно. Например, я отправляю 4 сообщения в одну разделенную на разделы тему и использую их одним потребителем. Я вижу в журнале, что все сообщения потребляются, даже если первое не подтверждено.

OlivierTerrien 30.01.2019 19:07

Мне нужно обрабатывать второе сообщение только после обработки первого. Вот почему я думал, что ack - это решение.

OlivierTerrien 30.01.2019 19:10

Непонятно, что вы имеете в виду. С точки зрения контейнера «обработано» означает, что метод прослушивателя завершает работу. Подтверждение просто фиксирует смещение в брокере, что означает, что вы больше не получите сообщение, если перезапустите приложение. Если вы передадите сообщение другому потоку для «обработки», вы получите следующее сообщение, независимо от потребительских свойств. Чтобы сделать то, что вы хотите, вы должны выполнить всю «обработку» перед выходом из метода прослушивателя.

Gary Russell 30.01.2019 19:25

Прости. Обрабатывать, потреблять, обрабатывать, получать, все это значит для меня одно и то же

OlivierTerrien 30.01.2019 19:27

Как я могу получить доступ к слушателю из моего метода StreamListener? Это проблема для меня. Framework вызывает этот метод для меня. И когда метод выполняется, сообщения уже есть.

OlivierTerrien 30.01.2019 19:30

Совершенно непонятно, что вы имеете в виду; @StreamListenerслушатель; он будет вызываться только с одной записью за раз, если вы не увеличите привязку ...consumer.concurrency и не добавите в тему больше разделов. Что вы подразумеваете под "уже здесь"? Как сказал @deadpool, вы можете использовать max.poll.records, чтобы уменьшить фактическое количество предварительно выбранных записей. Предварительная выборка записей обычно желательна для повышения производительности.

Gary Russell 30.01.2019 20:43

Он работает с параллелизмом = 1. Я думаю, что моя основная проблема была вызвана моим реактивным использованием. Я использовал subscribe() вместо block(). Я опубликовал свой пример здесь: github.com/oterrien/kafka-stream-for-stackoverflow. Кстати спасибо за помощь

OlivierTerrien 31.01.2019 00:46

Другие вопросы по теме