Коннектор flink kafka 0.11.0

Я пытаюсь подключиться к Flink kafka connector 0.11, но он продолжает выдавать мне эту ошибку при выполнении задания.

java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Как я понял из документации kafka, тайм-аут транзакции должен быть больше, чем интервал контрольной точки, но меньше, чем у брокера transaction.max.timeout.ms.

Мой кластер настроен, как показано ниже:

  • Флинк версии 1.4.2
  • Приложение с флинк-коннектор-кафка-0.11_2.11
  • Интервал контрольной точки: 5000 мс
  • Наблюдаемое время сквозной контрольной точки: 2 с

Конфигурация производителя Kafka:

transactional.id : tx-kafka-topic1
transaction.timeout.ms : 30000
acks: all
enable.idempotence : true
retries: 3
max.in.flight.requests.per.connection : 1

Брокер Kafka (kafka_2.11-1.0.0-cp1.jar) с конфигурацией сервера:

transaction.max.timeout.ms=120000
transaction.state.log.replication.factor=3

Мне кажется, что интервалы не перекрываются друг с другом, но задание все равно завершилось ошибкой с указанной выше ошибкой. Благодарю, если кто-то может указать мне правильное направление.

Каковы сроки событий? Может быть, проблема в первой контрольной точке, которая происходит после инициализации конвейера, и просто инициализация занимает больше 30 секунд? Было бы полезно, если бы вы поделились полными журналами приложений Flink.

Piotr Nowojski 24.05.2018 13:48

Обзор моего конвейера как источника -> плоская карта -> RichMap (с thread.sleep (100 мс) задерживает каждую запись) -> Приемник. Причина 100 мс - замедлить конвейер для тестирования обработки исключений. Вызывает ли время события таймаут, связанный со временем транзакции / контрольной точкой? Благодарю.

coffee_latte1020 26.05.2018 17:58
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
2
494
0

Другие вопросы по теме