У меня есть конвейер, который считывает данные из базы данных в ограниченной коллекции. Каждому элементу коллекции присвоена метка времени ProcessContext.outputWithTimestamp
. Данные читаются с помощью разделяемого DoFn, где ProcessContext.updateWatermark
вызывается в конце ProcessElement
. В общей сложности DoFn работает примерно со 100 разбиениями, так что это не одно целое.
Позже в конвейере определяется следующее фиксированное окно:
Window.<Map.Entry<Key, Long>>into(
FixedWindows.of(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.ZERO)
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(10))))
.discardingFiredPanes()
После окна коллекция объединяется по ключу: Sum.longsPerKey()
Проблема в том, что элементы коллекции никогда не проходят через комбайнер, пока коллекция не будет полностью прочитана. Это ожидаемое поведение потока данных в пакетном режиме? Я предполагаю, что Dataflow вообще не вычисляет / не перемещает водяной знак, это близко к истине?
Мой вопрос очень похож на Первые результаты преобразования GroupByKey, но в моем случае коллекция читается с помощью Splittable DoFn, где ProcessContext.updateWatermark
вызывается в конце каждого элемента.
Да, это ожидаемое поведение конвейера пакетного режима, независимо от использования Splittable DoFn.
Обычно все элементы проходят через каждый шаг за раз (вместе). Возможно, что результаты для окна будут обработаны раньше других, но это больше связано с емкостью и распределенным выполнением.
В конце концов, GroupByKey или, в вашем случае, Sum By Key, принудительно выполняет операцию перемешивания, которая требует, чтобы все данные были готовы перед фактическим выполнением преобразования SBK.
Я бы сказал, что вы правы, водяной знак не отслеживается для этого сценария.
В пакетном конвейере вы можете думать о водяном знаке для одновременного перехода от минимального к максимальному. Таким образом, все окна запускаются одновременно. Как упоминалось в ch_mike, в пакетном режиме каждый этап выполняется полностью до того, как будет запущен его последующий этап. Но их элементы должны проходить через комбайнер (при условии, что вы имеете в виду оптимизацию комбайнера в «картографе»).
Спасибо! Я также нашел разъяснение в beam.apache.org/documentation/runners/capability-matrix/…. Он говорит: «В пакетном режиме текущий прогресс водяного знака перескакивает с начала времени на конец времени после того, как ввод был полностью использован».