Processor.init () вызывается несколько раз для одной задачи в Kafka Stream

Я использую процессор, который использует знаки препинания WALL_CLOCK_TIME, и я заметил, что после фазы ребалансировки метод init() вызывается более одного раза для одной и той же задачи.

Я записываю эту строку в init():

log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);

И в логах я вижу, что он был вызван дважды:

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7

Кроме того, я записываю, что происходит в методе close(), и вижу, что там отменяется Cancellable ...

07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

... и судя по его хэш-коду (11a53ebd), существующий процессор использовался повторно, но был создан и новый. В результате моя периодическая задача была запланирована дважды, а не один раз.

Я думал, что на задачу будет только один процессор. Есть идеи, что могло вызвать такое поведение и как я могу предотвратить это?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
614
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Во время перебалансировки все Processor закрываются, а затем снова инициализируются после перебалансировки. Это необходимо, чтобы убедиться, что данные не потеряны.

Однако хеш, который вы видите, относится к зарегистрированной пунктуации, а не к объекту Processor. Таким образом, если вы укажете cancel как знаки препинания в close и schedule как знаки препинания в init(), то старое расписание будет заменено новым расписанием.

Но из того, что я наблюдал, init запускается дважды подряд после перебалансировки. Итак, порядок был: init -> close -> init -> init. Эта двойная инициализация меня озадачила. Это нормально? Если нет, я думаю, что могу сделать MCVE, чтобы показать это, потому что в проекте, над которым я работал, это происходило на постоянной основе, и я должен был остерегаться такого поведения.

Dth 16.03.2018 20:17

Я понимаю. Затем вы столкнулись с незначительной ошибкой, которая была исправлена ​​в следующем выпуске 1.1: github.com/apache/kafka/commit/…

Matthias J. Sax 17.03.2018 18:55

Ага, похоже так. Спасибо! Я пытался воспроизвести это, но в минимальном приложении этих проблем больше не было.

Dth 17.03.2018 19:30

@ MatthiasJ.Sax: Это все еще в силе? Я вижу, что при перезапуске потоков Kafka (с использованием низкоуровневого API) init () вызывается несколько раз. Не могли бы вы подсказать, когда эта проблема была решена?

CuriousMind 01.02.2021 13:13

Небольшое исправление, приведенное выше, есть в выпусках 2.0.0 (и более новых), как вы можете видеть из тегов фиксации.

Matthias J. Sax 02.02.2021 00:22

Другие вопросы по теме