Kafka Streams с обработкой .guarantee настроен на EXACTLY_ONCE

Я работаю над средой разработки с 3 (dockerized) брокерами kafka в моей системе. У брокеров значение transaction.state.log.replication.factor равно 3.

В конфигурации потокового приложения я установил processing.guarantee как EXACTLY_ONCE, а в конфигурации потребительского приложения я установил изоляцию .level как "read_committed".

Я проверил другие конфигурации на https://docs.confluent.io/current/streams/developer-guide/config-streams.html#processing-guarantee и настроил свою среду в соответствии с руководством.

После минуты создания сообщения из потокового приложения, которое читает хранилище состояний и создает 100 сообщений с помощью функции context.forward (..), приложение-потребитель прекращает чтение, как если бы в назначенных разделах не было зафиксированных сообщений.

Через некоторое время приложение стрима вылетает со следующей ошибкой:

"Aborting producer batches due to fatal error org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker."

Похоже, что производитель потока не может получить подтверждение, и транзакция истекает.

Изменить 1: Когда я останавливаю приложение потока, потребитель получает зафиксированные сообщения.

вы совершаете транзакцию где-нибудь? Покажи несколько примеров кода

freakman 27.09.2018 13:45

Сложно сказать. Я бы порекомендовал проверить логи брокера и стримов.

Matthias J. Sax 30.09.2018 03:26

Вы нашли решение?

Rolintocour 21.03.2019 10:50

@Rolintocour см. Мой ответ :)

FiliRnd 28.03.2019 10:42
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
4
1 338
1

Ответы 1

Расширение версии сервера и клиента Kafka, похоже, решает проблему

на какую версию сервера Kafka вы перешли?

klor 10.05.2019 16:06

Другие вопросы по теме