Я создал одну тему кафки с одним разделом.
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
Можно было бы запихнуть много сообщений в эту тему.
Но я бы хотел, чтобы один потребитель (для данной группы) обрабатывал эти сообщения одно за другим.
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
И пример кода Java
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
Чего не хватает в моей конфигурации?
Я думал, пока сообщение не подтверждено (смещение не увеличено), никакое другое сообщение из того же раздела не потребляется.




Если autoCommitOffset включен (по умолчанию), то подшивка уже будет подтверждать каждую запись. Так что к тому времени, когда он попадет на ваш StreamListener, запись уже будет подтверждена.
Исправление: приведенное выше утверждение о StreamListener не совсем верно. Автоматическое подтверждение выполняется, когда слушатель выходит.
Поскольку у вас есть только один раздел, вы будете получать сообщения в том же порядке, в котором они были отправлены в этот тематический раздел.
Вы можете отключить autoCommitOffset, и в этом случае вы можете использовать ручное подтверждение.
Спасибо. Но на самом деле это уже то, что я сделал. Я отключил autoCommitOffset. Я думал, что сообщение больше не потребляется, пока подтверждение не выполнено.
Ох, хорошо. Я не заметил этого в вашей конфигурации.
>So by the time, it gets to your StreamListener, the record is already acknowledged. — не совсем так; автоматическое подтверждение выполняется, когда слушатель выходит - не имеет отношения к вопросу, но я хотел это прояснить.
Спасибо, @GaryRussell. Обновил ответ.
Вы можете установить для этой потребительской конфигурации max.poll.records значение 1, по умолчанию это 500.
макс.опрос.записей
The maximum number of records returned in a single call to poll().
Спасибо. Но я не вижу никакого результата. Возможно конфигурация не удачная. Я установил «max.pool.records» здесь: spring.cloud.stream.kafka.binder.bindings.fileWriteBindingInput.consumer.configuration.max.poll.records: 1
I don't see any result что вы хотите этим сказать? это не сработало? у вас больше записей больше 1? @OlivierTerrien
да. Я все еще потребляю много сообщений одновременно. Но, возможно, моя конфигурация не очень хороша.
удалите этот max.request.size и max.message.bytes, я попробую, а также обратитесь к ответу @gray russell.
Я проверил самоанализом, и Fetcher все еще настроен на 500. То, что я настроил, не очень хорошо. не знаю где и как настроить max.poll.records
попробуйте это свойство в блоке configuration: @OlivierTerrien
Я использовал ackEachRecord в дополнение к configuration.max.poll.records, и теперь я вижу, что MaxPollRecords Fetcher установлен на 1. Однако поведение не соответствует ожидаемому. Но я думаю, что я не так далеко.
тогда я понятия не имею @OlivierTerrien
Еще раз спасибо. Я буду продолжать исследовать этот путь.
Отсутствие подтверждения сообщения не имеет ничего общего с остановкой доставки следующего сообщения.
Вы не можете передать сообщение другому потоку и подтвердить его позже; если вам нужна однопоточная обработка, вы должны выполнять всю обработку в потоке прослушивателя.
Привет Гэри. Возможно, метод ack не является хорошим решением. Даже настроив max.poll.records для потребления сообщений одно за другим, я могу в журналах использовать сообщения одновременно. Например, я отправляю 4 сообщения в одну разделенную на разделы тему и использую их одним потребителем. Я вижу в журнале, что все сообщения потребляются, даже если первое не подтверждено.
Мне нужно обрабатывать второе сообщение только после обработки первого. Вот почему я думал, что ack - это решение.
Непонятно, что вы имеете в виду. С точки зрения контейнера «обработано» означает, что метод прослушивателя завершает работу. Подтверждение просто фиксирует смещение в брокере, что означает, что вы больше не получите сообщение, если перезапустите приложение. Если вы передадите сообщение другому потоку для «обработки», вы получите следующее сообщение, независимо от потребительских свойств. Чтобы сделать то, что вы хотите, вы должны выполнить всю «обработку» перед выходом из метода прослушивателя.
Прости. Обрабатывать, потреблять, обрабатывать, получать, все это значит для меня одно и то же
Как я могу получить доступ к слушателю из моего метода StreamListener? Это проблема для меня. Framework вызывает этот метод для меня. И когда метод выполняется, сообщения уже есть.
Совершенно непонятно, что вы имеете в виду; @StreamListenerслушатель; он будет вызываться только с одной записью за раз, если вы не увеличите привязку ...consumer.concurrency и не добавите в тему больше разделов. Что вы подразумеваете под "уже здесь"? Как сказал @deadpool, вы можете использовать max.poll.records, чтобы уменьшить фактическое количество предварительно выбранных записей. Предварительная выборка записей обычно желательна для повышения производительности.
Он работает с параллелизмом = 1. Я думаю, что моя основная проблема была вызвана моим реактивным использованием. Я использовал subscribe() вместо block(). Я опубликовал свой пример здесь: github.com/oterrien/kafka-stream-for-stackoverflow. Кстати спасибо за помощь
Эта ссылка может быть вам полезна : github.com/spring-cloud/spring-cloud-stream/issues/575