Я работаю над средой разработки с 3 (dockerized) брокерами kafka в моей системе. У брокеров значение transaction.state.log.replication.factor равно 3.
В конфигурации потокового приложения я установил processing.guarantee как EXACTLY_ONCE, а в конфигурации потребительского приложения я установил изоляцию .level как "read_committed".
Я проверил другие конфигурации на https://docs.confluent.io/current/streams/developer-guide/config-streams.html#processing-guarantee и настроил свою среду в соответствии с руководством.
После минуты создания сообщения из потокового приложения, которое читает хранилище состояний и создает 100 сообщений с помощью функции context.forward (..), приложение-потребитель прекращает чтение, как если бы в назначенных разделах не было зафиксированных сообщений.
Через некоторое время приложение стрима вылетает со следующей ошибкой:
"Aborting producer batches due to fatal error org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker."
Похоже, что производитель потока не может получить подтверждение, и транзакция истекает.
Изменить 1: Когда я останавливаю приложение потока, потребитель получает зафиксированные сообщения.
Сложно сказать. Я бы порекомендовал проверить логи брокера и стримов.
Вы нашли решение?
@Rolintocour см. Мой ответ :)
Расширение версии сервера и клиента Kafka, похоже, решает проблему
на какую версию сервера Kafka вы перешли?
вы совершаете транзакцию где-нибудь? Покажи несколько примеров кода