Триггер водяного знака в Onyx не срабатывает

У меня есть поток сегментов Onyx, представляющих собой сообщения с отметкой времени (поступающие в хронологическом порядке). Дескать, они выглядят так:

{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:id 2 :timestamp "2018-09-04 21:32:03" :msg "Lorem ipsum"}
{:id 3 :timestamp "2018-09-05 03:01:52" :msg "Dolor sit amet"}
{:id 4 :timestamp "2018-09-05 09:28:16" :msg "Consetetur sadipscing"}
{:id 5 :timestamp "2018-09-05 12:45:33" :msg "Elitr sed diam"}
{:id 6 :timestamp "2018-09-06 08:14:29" :msg "Nonumy eirmod"}
...

Для каждого временного окна (одного дня) в данных я хочу выполнить вычисление для набора всех его сегментов. То есть в этом примере я хотел бы работать с сегментами с идентификаторами 1 и 2 (для 4 сентября), затем с идентификаторами 3, 4 и 5 (для 5 сентября) и так далее.

Onyx предлагает окна и триггеры, и они должен делают то, что я хочу, из коробки. Если я использую окно :window/type :fixed и суммирую по :window/range [1 :day] относительно :window/window-key :timestamp, я буду объединять все сегменты за каждый день.

Чтобы мои вычисления запускались только тогда, когда наступили все сегменты дня, Onyx предлагает поведение триггера :onyx.triggers/watermark. По документация он должен выстрелить

if the value of :window/window-key in the segment exceeds the upper-bound in the extent of an active window

Однако спусковой крючок не срабатывает., хотя я вижу, что более поздние сегменты уже поступают, и несколько окон должны быть заполнены. Для проверки работоспособности я попробовал простой триггер :onyx.triggers/segment, который работал, как ожидалось.


Моя неудачная попытка создать минимальный пример:

Я модифицировал фиксированные окнаигрушечная работа для проверки срабатывания водяного знака и это сработало там.

Однако я выяснил, что в этой игрушечной работе причина срабатывания триггера водяного знака может быть:

Did it close the input channel? Maybe the job just completed which can trigger the watermark too.


Еще один аспект, который взаимодействует с запуском водяных знаков, - это распределенная работа над задачами с помощью сверстники.

Комментарии к Выпуск # 839 (:trigger/emit не работает с :onyx.triggers/watermark) в репо Onyx указали мне на выпуск # 840 (Водяной знак не работает с темой Kafka, имеющей> 1 раздел), где я нашел этот Подсказка (выделено мной):

The problem is that all of your data is ending up on one partition, and the watermarks always takes the minimum watermark over all of the input peers (and if using the native kafka watermarks, the minimum watermark for a given peer).

As you call g/send with small amounts of data, and auto partition assignment, all of your data is ending up on one partition, meaning that the other partition's peer continues emitting a watermark of 0.


Я выяснил что:

It’s impossible to use it with the current watermark trigger, which relies on the input source. You could try to pull the previous watermark implementation [...]

Однако в моем графике задач сегменты, который я хочу агрегировать в окнах, - это только создан в какой-то промежуточной задаче, они как таковые не исходят из задачи ввода. Входные сегменты предоставляют только информацию о том, как создавать / извлекать содержимое сегментов для этой промежуточной задачи.

Опять же, эта конструкция отлично работает в вышеупомянутом игрушечная работа. Причина в том, что входной канал в какой-то момент закрывается, что завершает работу, что, в свою очередь, запускает водяной знак. Так что мой игрушечный пример на самом деле не является хорошей моделью, потому что это не открытый поток.

Если задание действительно получает рассматриваемые сегменты из фактического источника ввода, но без временных меток, Onyx, кажется, предоставляет место для указания assign-watermark-fn, который является необязательным атрибутом задачи Вход. Эта функция устанавливает водяной знак при каждом появлении нового сегмента. В моем случае это не помогает, поскольку сегменты не происходят из задачи ввода.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
106
1

Ответы 1

Теперь я сам придумал обходной путь. документация в основном дает подсказку, как это можно сделать:

This is a shortcut function for a punctuation trigger that fires when any piece of data has a time-based window key that is above another extent, effectively declaring that no more data for earlier windows will be arriving.

Итак, я изменил задачу, которая генерирует сегменты, так, чтобы для каждого сегмента также генерировался другой сегмент, похожий на "дозорный":

[{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:timestamp "2018-09-03 13:15:42" :over :out}]

Обратите внимание, что :timestamp предшествует диапазону окна (здесь 1 день). Таким образом, он будет отправлен в окно предыдущий. Поскольку мои данные поступают в хронологическом порядке, триггер :punctuation может определить по присутствию «сигнального» сегмента (с ключевым словом: over), что окно можно закрыть. Не забудьте выселить (т.е. :trigger/post-evictor [:all]) и выбросить сегмент "дозорного" из последнего окна. Добавление :onyx/max-peers 1 в карту задач гарантирует, что дозорный всегда рано или поздно появится, особенно при использовании группировки.

Обратите внимание, что в этот обходной путь входят два предположения:

  1. Данные поступают в хронологический
  2. Есть нет окон без сегментов

Другие вопросы по теме