Выполняется ли смещение потребителя, даже если не удается опубликовать выходную тему в Kafka Streams?

Если у меня есть потоковое приложение Kafka, которое не может опубликовать сообщение в теме (поскольку тема не существует), фиксирует ли оно смещение потребителя и продолжает, или оно будет повторять одно и то же сообщение, пока не сможет разрешить выходную тему? Приложение просто печатает ошибку и работает нормально, насколько я могу судить.

Пример ошибки при попытке написать в тему:

Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}

На мой взгляд, это будет просто крутить одно и то же сообщение, пока проблема не будет решена, чтобы не потерять данные? Я не мог найти четкого ответа на то, что такое поведение по умолчанию. Мы не выключали автоматическую фиксацию или что-то в этом роде, большинство настроек установлены по умолчанию.

Я спрашиваю, потому что мы не хотим оказаться в ситуации, когда проверка работоспособности в порядке (приложение работает, пока печатаются ошибки в журнал), и мы просто выбрасываем тонны сообщений Kafka.

Если вы хотите убедиться, что смещение не фиксируется для сообщений об ошибках, отключите автоматическую фиксацию и после обработки каждого сообщения вручную фиксируйте его с помощью метода синхронизации. Это обеспечит лучшую согласованность.

waterbyte 31.05.2019 09:14

Да, это справедливо. В этом случае мне больше любопытно, каково поведение по умолчанию. Я могу поэкспериментировать, как только у меня появится такая возможность, но решил, что я не могу быть первым, кто задается вопросом, каково поведение коммита по умолчанию, когда поток не может обработать сообщение или опубликовать сообщение в исходящей теме.

Wobbley 31.05.2019 10:29

Теоретически в вашем случае данные будут потеряны. Как только приложение Kafka извлечет данные и будет включена автоматическая фиксация, оно зафиксирует смещение. Приложение Kafka не знает, что вы делаете с данными.

waterbyte 31.05.2019 12:23
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
3
515
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Kafka Streams не будет фиксировать смещения для этого случая, поскольку он предоставляет гарантии обработки по крайней мере один раз (на самом деле даже невозможно перенастроить Kafka Streams по-другому — возможны только более сильные гарантии ровно один раз). Кроме того, Kafka Streams всегда отключает автоматическую фиксацию для потребителя (и не позволяет вам ее включить), поскольку Kafka Streams сама управляет фиксацией смещения.

Если вы работаете с настройкой по умолчанию, производитель должен фактически выдать исключение, и соответствующий поток должен умереть — вы можете получить обратный вызов, если поток умрет, зарегистрировав KafkaStreams#uncaughtExceptionHandler().

Вы также можете наблюдать KafkaStreams#state() (или зарегистрировать обратный звонок KafkaStreams#setStateListener()). Состояние перейдет в DEAD, если все потоки мертвы (обратите внимание, в старой версии была ошибка, из-за которой состояние все еще было RUNNING для этого случая: https://issues.apache.org/jira/browse/KAFKA-5372)

Следовательно, приложение не должно находиться в работоспособном состоянии, и Kafka Streams не будет повторять входное сообщение, а прекратит обработку, и вам потребуется перезапустить клиент. При перезапуске он будет перечитывать ошибочное входное сообщение и повторять попытку записи в выходной раздел.

Если вы хотите, чтобы Kafka Streams повторил попытку, вам нужно увеличить конфигурацию производителя reties, чтобы избежать того, что производитель выдает исключение и повторяет внутренние попытки записи. Это может в конечном итоге «заблокировать» дальнейшую обработку, если буфер записи производителя заполнится.

Другие вопросы по теме