Я использую процессор, который использует знаки препинания WALL_CLOCK_TIME, и я заметил, что после фазы ребалансировки метод init() вызывается более одного раза для одной и той же задачи.
Я записываю эту строку в init():
log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);
И в логах я вижу, что он был вызван дважды:
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7
Кроме того, я записываю, что происходит в методе close(), и вижу, что там отменяется Cancellable ...
07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
... и судя по его хэш-коду (11a53ebd), существующий процессор использовался повторно, но был создан и новый. В результате моя периодическая задача была запланирована дважды, а не один раз.
Я думал, что на задачу будет только один процессор. Есть идеи, что могло вызвать такое поведение и как я могу предотвратить это?




Во время перебалансировки все Processor закрываются, а затем снова инициализируются после перебалансировки. Это необходимо, чтобы убедиться, что данные не потеряны.
Однако хеш, который вы видите, относится к зарегистрированной пунктуации, а не к объекту Processor. Таким образом, если вы укажете cancel как знаки препинания в close и schedule как знаки препинания в init(), то старое расписание будет заменено новым расписанием.
Я понимаю. Затем вы столкнулись с незначительной ошибкой, которая была исправлена в следующем выпуске 1.1: github.com/apache/kafka/commit/…
Ага, похоже так. Спасибо! Я пытался воспроизвести это, но в минимальном приложении этих проблем больше не было.
@ MatthiasJ.Sax: Это все еще в силе? Я вижу, что при перезапуске потоков Kafka (с использованием низкоуровневого API) init () вызывается несколько раз. Не могли бы вы подсказать, когда эта проблема была решена?
Небольшое исправление, приведенное выше, есть в выпусках 2.0.0 (и более новых), как вы можете видеть из тегов фиксации.
Но из того, что я наблюдал,
initзапускается дважды подряд после перебалансировки. Итак, порядок был: init -> close -> init -> init. Эта двойная инициализация меня озадачила. Это нормально? Если нет, я думаю, что могу сделать MCVE, чтобы показать это, потому что в проекте, над которым я работал, это происходило на постоянной основе, и я должен был остерегаться такого поведения.