Наша команда пытается создать систему профилактического обслуживания, задачей которой является просмотр набора событий и прогнозирование того, отражают ли эти события набор известных аномалий или нет.
Мы находимся на этапе проектирования, и текущий дизайн системы выглядит следующим образом:
Чтобы классифицировать набор событий как аномалию, события должны произойти в одном и том же временном окне. например скажем, есть три источника данных, которые передают соответствующие события в темы Kafka, но по какой-то причине данные не синхронизируются. Таким образом, один из механизмов вывода извлекает последние записи из каждой темы kafka, но соответствующие события в извлеченных данных не принадлежат одному и тому же временному окну (скажем, 1 часу). Это приведет к неверным прогнозам из-за несинхронизированных данных.
Нам нужно выяснить, как мы можем убедиться, что данные из всех трех источников передаются по порядку, чтобы, когда механизм логического вывода запрашивает записи (скажем, последние 100 записей) из нескольких тем какфа, соответствующие записи в каждой теме принадлежали то же временное окно?
Некоторые предложения -
Обработка задержки на стороне производителя -
Убедитесь, что все три производителя всегда синхронно отправляют данные в темы Kafka, используя batch.size
и linger.ms
.
например. если для linger.ms установлено значение 1000, все сообщения будут отправляться в Kafka в течение 1 секунды.
Обработка задержки на стороне потребителя - Принимая во внимание любой механизм потоковой передачи на стороне потребителя (будь то Kafka-stream, spark-stream, Flink), предоставляет функциональные возможности Windows для объединения/объединения потоковых данных на основе ключей с учетом функции окна с задержкой.
Проверьте это - окна Flink для справки, как выбрать правильный тип окна связь
Чтобы справиться с этим сценарием, источники данных должны предоставить потребителю некоторый механизм, позволяющий понять, что все соответствующие данные получены. Самое простое решение — опубликовать пакет из источника данных с идентификатором пакета (Guid) в той или иной форме. Затем потребители могут подождать, пока не появится идентификатор следующей партии, отмечающий конец предыдущей партии. Этот подход предполагает, что источники не будут пропускать пакет, иначе они будут постоянно смещены. Не существует алгоритма для обнаружения этого, но у вас могут быть некоторые поля в данных, которые показывают неоднородность и позволяют вам повторно выравнивать данные.
Более слабая версия этого подхода состоит в том, чтобы либо просто подождать x секунд и предположить, что все источники преуспевают в течение этого большого количества времени, либо посмотреть на некоторую форму меток времени (логические или настенные часы), чтобы определить, что источник перешел к следующему временному окну. неявно показывая завершение последнего окна.
Я бы предложил KSQL, который представляет собой потоковый механизм SQL, который позволяет обрабатывать данные в реальном времени с помощью Apache Kafka. Он также обеспечивает хорошую функциональность для оконного агрегирования и т. д.
Есть 3 способа определить Окна в KSQL:
hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are time windows, because they're defined by fixed durations they you specify. Session windows are dynamically sized based on incoming data and defined by periods of activity separated by gaps of inactivity.
В вашем контексте вы можете использовать KSQL для запроса и объединения интересующих тем с помощью Оконные соединения. Например,
SELECT t1.id, ...
FROM topic_1 t1
INNER JOIN topic_2 t2
WITHIN 1 HOURS
ON t1.id = t2.id;
Следующие рекомендации должны максимизировать успех синхронизации событий для проблемы обнаружения аномалий с использованием данных временных рядов.
С помощью этих примитивов мы должны иметь возможность выравнивать события временных рядов, учитывая отклонения во времени из-за сетевых задержек.
На стороне механизма логического вывода расширьте окна на уровне каждого производителя, чтобы синхронизировать события между производителями.
Спасибо за предложения. Ваше решение вполне разумно, но так как у меня ограниченное знание всей системы, я искал какие-то практические решения (инструменты, доступные для реализации указанной задачи) в дополнение к концептуальному решению.
Для синхронизации времени по сети используйте NTP. Это можно сделать при запуске узла или перезагрузке устройства. Сообщения Heartbeat можно публиковать в теме Kafka. Вам просто нужны ProducerId, TimeStamp, ArrivalTimeStamp. Наличие сообщения указывает на сердцебиение. См.: gerardnico.com/dit/kafka/timestamp для обсуждения извлечения временных меток.
Предикторы задержки сообщений можно создавать с использованием того же стека машинного обучения, который вы используете для механизма логического вывода. Поскольку могут быть потеряны сообщения, вам необходимо рассмотреть модель оставшихся сообщений, такую как пропорциональная опасность Кокса, чтобы обеспечить точность.
Вы задали довольно интересный вопрос. Возможно, эта статья приведет вас к какому-то решению.