Потоковая передача Databricks DLT со скользящим окном, в котором отсутствует интервал последнего окна

У меня есть конвейер DLT, в котором я хочу вычислить скользящее среднее значение столбца за последние 24 часа, которое обновляется каждый час.

Для этого я использую приведенный ниже код:

@dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define window for 24 hours with 1-hour slide
    window_spec_24h = window("fetch_ts", "24 hours", "1 hour")


    df.withWatermark("fetch_ts", "10 minutes")\
      .groupBy(df.Id, window_spec_24h)\
      .agg(avg("foo").alias("average_foo_24h"))


    return df

Моя проблема в том, что мне всегда не хватает последнего окна в моем результате df. Например, если мой входной df имеет следующие значения fetch_ts:

2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000

выходные данные df имеют следующие окна:

{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02- 23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}

это означает, что моя последняя строка с "2024-02-23T18:54:00.000" fetch_ts исключается из расчета.

При следующем триггере появляется ранее отсутствующее окно, но на этот раз не появляется последнее окно последнего пакета. Это продолжается вот так.

Есть идеи, почему это происходит? Или это так задумано и я что-то упускаю? Есть ли способ добавить последнее окно {"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}, чтобы я мог включить последнюю строку в свои вычисления?

Спасибо и с уважением,

(Я попытался удалить водяной знак, и тогда я также могу получить последнее окно, но удаление водяного знака не является вариантом, поскольку я хочу соединить вычисленный df с исходным, чтобы включить другие столбцы. Объединения потоков не допускаются без водяной знак.)

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
206
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

На самом деле DLT использует структурированную потоковую передачу, поэтому я объясню семантику создания окон.

В потоковом агрегировании записи могут поступать с опозданием и не по порядку. Чтобы справиться с этим, SS будет буферизовать записи во временных окнах. В конце каждого пакета режим вывода потока определяет, какие записи будут отправлены в нисходящий поток. Однако режим вывода скрыт от вас в DLT и не настраивается напрямую.

По умолчанию используется режим вывода «Добавить», который генерирует агрегаты, которые не будут меняться при последующих триггерах. Какие агрегаты не изменятся? Агрегаты, которые не изменятся, — это те, которые больше не будут получать записи; окна, которые больше не будут получать записи, — это те окна, у которых временная метка окончания меньше водяного знака. Поскольку водяной знак 15:00 говорит о том, что поток больше не будет получать записи до 15:00, движок закроет окна, которые заканчиваются до 15:00.

Итак, в вашем примере последняя запись 2024-02-23T18:54:00.000 фактически обрабатывается! Эта запись приводит к тому, что водяной знак перемещается на 10 минут раньше, 2024-02-23T18:44:00.00, что действительно больше, чем конец последнего окна (оканчивающегося на 2024-02-23T18:00:00.000Z), которое вы видите.

Затем, когда вы передаете новые записи, водяной знак продвигается дальше, что приводит к созданию буферизованных записей. Это ответ на:

При следующем триггере появляется ранее отсутствующее окно, но на этот раз не появляется последнее окно последнего пакета.

Если вы действительно хотите видеть окна в момент их создания (а не после того, как водяной знак пересекает их конец), лучшим выбором будет MV в DLT Serverless.

Упражнения

Вам действительно не обязательно выполнять эти упражнения, поскольку в DLT вы намеренно защищены от режима вывода. Но поскольку об этом спросили в [spark-structured-streaming], я беру на себя смелость их добавить :)

Чтобы упростить описание этих упражнений, давайте просто будем использовать секунды в качестве временных меток. Мы сделаем 10-секундное переворачивающееся окно (не скользящее) с водяным знаком в 5 секунд.

Получаем записи 5, 6 и 9. Сколько агрегатов выбрасывается вниз по течению?

Никакие записи не выдаются. Движок получает 9 и вычитает 5 (задержка водяного знака), что означает, что водяной знак теперь равен 4. Концы окон не меньше 4, поэтому ничего не излучается.

Получаем 11. Какие записи выбрасываются?

И снова нет. Когда получено 11, водяной знак обновляется до 11–5, что равно 6. Никакие окна не могут быть меньше этого значения.

Какую минимальную временную метку вам нужно получить, чтобы было создано ровно одно окно?

Мы хотим, чтобы окно [0, 10] было создано, поэтому нам нужно убедиться, что водяной знак больше или равен 10. Мы прибавляем задержку водяного знака к 10, что дает нам 10 + 5 = 15. Таким образом, получается запись с временной меткой 15. приведет к появлению этого одного окна.

Наконец, давайте разовьем интересный комментарий, который вы сделали: «Это продолжается вот так». Бывает ли когда-нибудь ситуация, в которой при использовании ненулевой задержки водяного знака d и оконной потоковой агрегации это не будет «продолжаться так?» То есть, сможете ли вы когда-нибудь заставить структурированную потоковую передачу выдавать все окна из промежуточного состояния?

Нет, это невозможно. Чтобы доказать это, предположим, что нет. Предположим, что максимальное время события, которое мы видели до сих пор, равно m. Назовите окно, которому принадлежит mwin(m), и конец этого окна end(win(m)). end(win(m)) должно быть больше или равно m по определению окна. Таким образом, чтобы это окно было создано, мы должны получить запись с временной меткой, большей или равной end(win(m)) + d, где d — задержка водяного знака. Напомним, что мы ограничили d ненулевым значением. Таким образом, у нас есть end(win(m)) >= m и d > 0, поэтому end(win(m)) + d > m. Однако эта величина больше, чем m, которое, как мы сказали, было самым большим временем события, которое мы видели, — противоречие.

Большое спасибо за ответ и дополнительные материалы, Нил, очень признателен :)

atlanticblue 06.03.2024 12:21

Другие вопросы по теме