Почему мой процессор NiFi PublishKafka работает только с предыдущими версиями?

Я использую Kafka 2, и по какой-то причине единственными процессорами NiFi, которые будут правильно публиковать мои сообщения в Kafka, являются PublishKafka (0_9) и PublishKafka_0_10. Более поздние версии не проталкивают мои сообщения, что странно, потому что, опять же, я использую Kafka 2.1.1.

Для получения дополнительной информации: когда я пытаюсь запустить свой FlowFile через более поздние процессоры PublishKafka, я получаю исключение тайм-аута, которое многократно повторяется.

    2019-03-11 16:05:34,200 ERROR [Timer-Driven Process Thread-7] o.a.n.p.kafka.pubsub.PublishKafka_2_0 PublishKafka_2_0[id=6d7f1896-0169- 
    1000-ca27-cf7f86f22694] PublishKafka_2_0[id=6d7f1896-0169-1000-ca27- 
    cf7f86f22694] failed to process session due to 
    org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
    initializing transactional state in 5000ms.; Processor Administratively 
    Yielded for 1 sec: org.apache.kafka.common.errors.TimeoutException: 
    Timeout expired while initializing transactional state in 5000ms.
    org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
    initializing transactional state in 5000ms.
    2019-03-11 16:05:34,201 WARN [Timer-Driven Process Thread-7] 
    o.a.n.controller.tasks.ConnectableTask Administratively Yielding 
    PublishKafka_2_0[id=6d7f1896-0169-1000-ca27-cf7f86f22694] due to uncaught 
    Exception: org.apache.kafka.common.errors.TimeoutException: Timeout 
    expired while initializing transactional state in 5000ms.

Мои настройки процессора следующие: Почему мой процессор NiFi PublishKafka работает только с предыдущими версиями?

Все остальные конфигурации являются настройками по умолчанию. Любые идеи о том, почему это происходит?

Как правило, версия процессора должна совпадать с версией брокера, поэтому я ожидаю, что PublishKafka_2_0 будет работать с вашим брокером 2.1.1. если это не так, вам, вероятно, придется показать конфигурацию вашего процессора kafka, а также ошибки и трассировки стека из nifi-app.log.

Bryan Bende 11.03.2019 18:43

@BryanBende Я поместил их в исходный пост. Я пробовал использовать этот процессор и без транзакций, но результат тот же.

Kelly P 11.03.2019 19:43

Спасибо за дополнительную информацию, еще один вопрос, в $KAFKA_HOME/config/server.properties что у вас есть для "слушателей"? это "listeners=PLAINTEXT=app03.dev.onyxgs.com:9092"?

Bryan Bende 11.03.2019 20:33

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092. Мы также пробовали app03.dev.onyxgs.com:9092 с тем же результатом в любом случае.

Kelly P 12.03.2019 14:45

Хм, у тебя тоже есть KAFKA_LISTENERS? Я думаю, что есть два разных свойства, также KAFKA_ADVERTISED_LISTENERS должен использовать имя хоста, к которому вы хотите, чтобы клиенты подключались, поэтому я не думаю, что localhost будет работать там.

Bryan Bende 12.03.2019 14:58

Итак, теперь я изменил свойство listeners на localhost, как показано выше, и рекламируемые слушатели на обычное внешнее имя - все равно не повезло. Все запускается, но Нифи все еще пытается и не может подключиться к брокеру Kafka 1 и не может его найти. Я устанавливаю незащищенный контейнер Nifi, чтобы проверить, не проблема ли это в сети, но кроме этого у меня ничего нет.

Kelly P 12.03.2019 16:17
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
6
1 384
0

Другие вопросы по теме