Spring Cloud DataFlow

Я создаю поток, в котором источник (производитель) создает около 12 миллионов записей примерно за 8 минут, трансформатор (потребитель) начинает их нормально использовать, но в какой-то момент примерно через 4 минуты в журнале приложения , и он перестает получать что-либо после этой точки:

2018-07-11 21:59:18,811 24043857 [kafka-coordinator-heartbeat-thread | cdSomeApp] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=cdSomeApp] Marking the coordinator 10.16.17.59:9092 (id: 2147483644 rack: null) dead
2018-07-11 21:59:18,815 24043861 [cdSomeApp.cd-source.container-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=cdSomeApp] Discovered group coordinator 10.16.17.59:9092 (id: 2147483644 rack: null)
2018-07-11 21:59:18,815 24043861 [cdSomeApp.cd-source.container-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=cdSomeApp] Marking the coordinator 10.16.17.59:9092 (id: 2147483644 rack: null) dead
2018-07-11 21:59:18,930 24043976 [cdSomeApp.cd-source.container-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=cdSomeApp] Discovered group coordinator 10.16.17.59:9092 (id: 2147483644 rack: null)
2018-07-11 21:59:18,933 24043979 [cdSomeApp.cd-source.container-0-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=cdSomeApp] Offset commit failed on partition cdSomeApp.cd-source-0 at offset 140802810: The coordinator is not aware of this member.
2018-07-11 21:59:18,937 24043983 [cdSomeApp.cd-source.container-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:787)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:735)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:814)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:794)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:507)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)

Из того, что я вижу, значения по умолчанию для конфигурации kafka должны работать нормально, но если кто-то знает лучше, посоветуйте?

Благодарность!

Приложение использует Spring boot 2.0.2.RELEASE.

ka2 12.07.2018 22:41

Также spring-cloud-starter-stream-kafka и spring-boot-starter-cloud-connector версии 2.0.0.RELEASE. Установка kafka находится по адресу kafka_2.11-1.1.0

ka2 12.07.2018 22:52

Я побежал снова, изменив конфигурацию kafka 1.0 по умолчанию с конфигурации по умолчанию, те же результаты, вот значения изменены:

ka2 13.07.2018 15:28

request.timeout.ms: 18300000 session.timeout.ms: 7200000 максимальное количество записей опроса: 500 heartbeat.interval.ms: 1800000

ka2 13.07.2018 19:05
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
4
275
1

Ответы 1

Отчет не содержит информации о версии. Было бы хорошо, если бы сообщение редактировалось с использованием Spring Cloud Stream (версия для начинающих приложений - какой URL-адрес bit.ly использовался?), Spring Boot, SCDF и используемых версий брокера Kafka.

С учетом всего вышесказанного у нас был аналогичный отчет в выпуске Spring Cloud Stream Chelsea против Kafka 0.9. Вот такие детали и результат.

Если вы используете эту комбинацию версий, вам придется перейти на Ditmars (1.3.x) или последнюю версию Elmhurst (2.0.x). У нас также есть последняя версия bit.ly против этих версий в Сайт проекта для начинающих приложений.

насколько я знаю, я использую последнюю версию всего

ka2 12.07.2018 23:58

Перебалансировка означает, что преобразователю требуется более 5 минут для обработки записей, полученных в результате опроса. Попробуйте уменьшить max.poll.records и / или увеличить max.poll.interval.ms.

Gary Russell 18.07.2018 15:52

Другие вопросы по теме