KafkaIO withLogAppendTime против withProcessingTime

В документации Beam рекомендуется использовать withLogAppendTime вместо withProcessingTime. Почему это так?

Зависит от того, выполняете ли вы историческую обработку или нет ... То есть вы получаете текущее время, в которое вы видите запись, или вы получаете фактическое время, когда запись вставлена ​​в журнал

OneCricketeer 12.12.2018 01:24
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
1
132
2

Ответы 2

Как сказал cricket_007, это зависит от вашего варианта использования.

Одна из ключевых концепций Beam - обработка времени события. То есть вы можете определить свою логику обработки данных не с точки зрения того, когда служба (конвейер Beam) получает данные, а с точки зрения того, когда действительно произошло событие (например, когда пользователь действительно щелкнул объявление). Это помогает в случаях потоковой передачи, когда ваши потоки данных могут содержать запаздывающие или неупорядоченные события. Beam позволяет справиться с этими случаями.

Например. если в вашем конвейере есть шаг, который выполняет что-то вроде "совокупные события, произошедшие с 13:00 до 14:00 23 октября 2018 г.", что произойдет, если событие, которое на самом деле произошло в 13:30, прибудет с опозданием (скажем, в 15:30) из-за некоторых сетевых задержек или чего-то еще? При подходе, основанном на времени обработки, это позднее событие, вероятно, будет учтено в следующем окне (например, «с 14:00 до 15:00»). Но есть большая вероятность, что ваша бизнес-логика предпочтет пересчитать исходную агрегацию «с 13:00 до 14:00» вместо использования позднего события в другом агрегировании. Обработка таких бизнес-кейсов является основной причиной обработки времени события.

Однако вам может быть неинтересно обрабатывать это в своей бизнес-логике, например если вы не выполняете никаких окон / агрегаций (например, базовый ETL), или если у вас вообще нет запоздалых данных (например, когда вы читаете из существующего файла), или ваша бизнес-логика просто не заботится о это, или события редки и доставка достаточно надежна, или у вас может не быть надежной временной метки, доступной вам в данных события и т. д. и т. д., поэтому вы можете вместо этого использовать время обработки. Все зависит от того, как ваша бизнес-логика требует обработки данных.

Временные метки событий назначаются близко к источнику событий в Beam (обычно в IO), поэтому в случае Kafka у вас есть эти параметры, чтобы выбрать, откуда приходит временная метка события: https://beam.apache.org/releases/javadoc/2.8.0/org/apache/beam/sdk/io/kafka/TimestampPolicy.html. Другие источники могут использовать другие способы присвоения меток времени событиям (например, PubsubIO может считывать метку времени, указанную в атрибутах сообщения).

Рекомендую посмотреть презентации здесь, они углубляются в эту тему: https://beam.apache.org/documentation/resources/

Мне не нужно здесь убедительно сравнивать время события и время обработки, поскольку я знаю и принимаю решение на основе наших ограничений. Вопрос не в этом. Либо временная метка, которую я использую, я не обрабатываю по времени события, и в некоторых системах это является обязательным требованием.

user_1357 16.01.2019 23:05

У меня не было намерения убедить вас использовать то же самое, но я хотел объяснить различия и доводы, стоящие за ними. Из вашего исходного сообщения не было ясно, что вы уже знаете об этом или что у вас могут быть требования для использования определенного подхода.

Anton 16.01.2019 23:53

Тем не менее, я не считаю, что есть веская причина рекомендовать время события, кроме ограничений использования времени обработки, подобных тем, что я упоминал выше. Любой подход идеально подходит для их случая использования.

Anton 16.01.2019 23:53

Опять же, мой вопрос заключается в том, какое из двух периодов обработки, доступных Kafka, использовать, на который здесь нет ответа.

user_1357 17.01.2019 03:49

Тогда я не уверен, что понимаю вопрос. Что есть в Kafka - я не знаю, похоже, это не связано с Beam. В Beam доступно несколько режимов, как вы упомянули, есть withLogAppendTime и withProcessingTime. Первый - это случай времени события, когда время события устанавливается равным времени добавления журнала Kafka для сообщения. В последнем случае каждой записи назначается время чтения.

Anton 17.01.2019 05:13

Однако вы, вероятно, можете расширить TimestampPolicyFactory для реализации своей собственной политики: beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/i‌ o /…

Anton 17.01.2019 05:14

Также есть withCreateTime, подробнее об этих опциях смотрите здесь: beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/i‌ o /…

Anton 17.01.2019 05:15

Пара причин предпочесть обработку времени события:

Вы можете повторить обработку позже - например, чтобы исправить ошибку, внести изменения или протестировать другой подход. Возможность использовать один и тот же код как в реальных, так и в исторических потоках упрощает задачу.

Последовательное, детерминированное поведение - если вы прогоните одни и те же данные через один и тот же код, вы получите те же результаты. Это не относится к времени обработки. Опять же, это упрощает некоторые вещи (например, тестирование).

Другие вопросы по теме