У меня есть конвейер DLT, в котором я хочу вычислить скользящее среднее значение столбца за последние 24 часа, которое обновляется каждый час.
Для этого я использую приведенный ниже код:
@dlt.table()
def gold():
df = dlt.read_stream("silver_table")
# Define window for 24 hours with 1-hour slide
window_spec_24h = window("fetch_ts", "24 hours", "1 hour")
df.withWatermark("fetch_ts", "10 minutes")\
.groupBy(df.Id, window_spec_24h)\
.agg(avg("foo").alias("average_foo_24h"))
return df
Моя проблема в том, что мне всегда не хватает последнего окна в моем результате df. Например, если мой входной df имеет следующие значения fetch_ts:
2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000
выходные данные df имеют следующие окна:
{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02- 23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}
это означает, что моя последняя строка с "2024-02-23T18:54:00.000" fetch_ts исключается из расчета.
При следующем триггере появляется ранее отсутствующее окно, но на этот раз не появляется последнее окно последнего пакета. Это продолжается вот так.
Есть идеи, почему это происходит? Или это так задумано и я что-то упускаю? Есть ли способ добавить последнее окно {"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}, чтобы я мог включить последнюю строку в свои вычисления?
Спасибо и с уважением,
(Я попытался удалить водяной знак, и тогда я также могу получить последнее окно, но удаление водяного знака не является вариантом, поскольку я хочу соединить вычисленный df с исходным, чтобы включить другие столбцы. Объединения потоков не допускаются без водяной знак.)





На самом деле DLT использует структурированную потоковую передачу, поэтому я объясню семантику создания окон.
В потоковом агрегировании записи могут поступать с опозданием и не по порядку. Чтобы справиться с этим, SS будет буферизовать записи во временных окнах. В конце каждого пакета режим вывода потока определяет, какие записи будут отправлены в нисходящий поток. Однако режим вывода скрыт от вас в DLT и не настраивается напрямую.
По умолчанию используется режим вывода «Добавить», который генерирует агрегаты, которые не будут меняться при последующих триггерах. Какие агрегаты не изменятся? Агрегаты, которые не изменятся, — это те, которые больше не будут получать записи; окна, которые больше не будут получать записи, — это те окна, у которых временная метка окончания меньше водяного знака. Поскольку водяной знак 15:00 говорит о том, что поток больше не будет получать записи до 15:00, движок закроет окна, которые заканчиваются до 15:00.
Итак, в вашем примере последняя запись 2024-02-23T18:54:00.000 фактически обрабатывается! Эта запись приводит к тому, что водяной знак перемещается на 10 минут раньше, 2024-02-23T18:44:00.00, что действительно больше, чем конец последнего окна (оканчивающегося на 2024-02-23T18:00:00.000Z), которое вы видите.
Затем, когда вы передаете новые записи, водяной знак продвигается дальше, что приводит к созданию буферизованных записей. Это ответ на:
При следующем триггере появляется ранее отсутствующее окно, но на этот раз не появляется последнее окно последнего пакета.
Если вы действительно хотите видеть окна в момент их создания (а не после того, как водяной знак пересекает их конец), лучшим выбором будет MV в DLT Serverless.
Вам действительно не обязательно выполнять эти упражнения, поскольку в DLT вы намеренно защищены от режима вывода. Но поскольку об этом спросили в [spark-structured-streaming], я беру на себя смелость их добавить :)
Чтобы упростить описание этих упражнений, давайте просто будем использовать секунды в качестве временных меток. Мы сделаем 10-секундное переворачивающееся окно (не скользящее) с водяным знаком в 5 секунд.
Получаем записи 5, 6 и 9. Сколько агрегатов выбрасывается вниз по течению?
Никакие записи не выдаются. Движок получает 9 и вычитает 5 (задержка водяного знака), что означает, что водяной знак теперь равен 4. Концы окон не меньше 4, поэтому ничего не излучается.
Получаем 11. Какие записи выбрасываются?
И снова нет. Когда получено 11, водяной знак обновляется до 11–5, что равно 6. Никакие окна не могут быть меньше этого значения.
Какую минимальную временную метку вам нужно получить, чтобы было создано ровно одно окно?
Мы хотим, чтобы окно
[0, 10]было создано, поэтому нам нужно убедиться, что водяной знак больше или равен 10. Мы прибавляем задержку водяного знака к 10, что дает нам 10 + 5 = 15. Таким образом, получается запись с временной меткой 15. приведет к появлению этого одного окна.
Наконец, давайте разовьем интересный комментарий, который вы сделали: «Это продолжается вот так». Бывает ли когда-нибудь ситуация, в которой при использовании ненулевой задержки водяного знака d и оконной потоковой агрегации это не будет «продолжаться так?» То есть, сможете ли вы когда-нибудь заставить структурированную потоковую передачу выдавать все окна из промежуточного состояния?
Нет, это невозможно. Чтобы доказать это, предположим, что нет. Предположим, что максимальное время события, которое мы видели до сих пор, равно
m. Назовите окно, которому принадлежитmwin(m), и конец этого окнаend(win(m)).end(win(m))должно быть больше или равноmпо определению окна. Таким образом, чтобы это окно было создано, мы должны получить запись с временной меткой, большей или равнойend(win(m)) + d, гдеd— задержка водяного знака. Напомним, что мы ограничилиdненулевым значением. Таким образом, у нас естьend(win(m)) >= mиd > 0, поэтомуend(win(m)) + d > m. Однако эта величина больше, чемm, которое, как мы сказали, было самым большим временем события, которое мы видели, — противоречие.
Большое спасибо за ответ и дополнительные материалы, Нил, очень признателен :)