Синхронизация данных из нескольких источников данных

Наша команда пытается создать систему профилактического обслуживания, задачей которой является просмотр набора событий и прогнозирование того, отражают ли эти события набор известных аномалий или нет.

Мы находимся на этапе проектирования, и текущий дизайн системы выглядит следующим образом:

  • События могут происходить на нескольких источниках системы IoT (например, на облачной платформе, пограничных устройствах или любых промежуточных платформах).
  • События отправляются источниками данных в систему очередей сообщений (в настоящее время мы выбрали Apache Kafka).
  • У каждого источника данных есть своя очередь (тема Kafka).
  • Из очередей данные потребляются несколькими механизмами логического вывода (которые на самом деле являются нейронными сетями).
  • В зависимости от набора функций механизм логического вывода подпишется на несколько тем Kafka и поток данных из этих тем для непрерывного вывода вывода.
  • Общая архитектура следует принципу единой ответственности, что означает, что каждый компонент будет отделен друг от друга и будет работать внутри отдельного контейнера Docker.

Проблема:

Чтобы классифицировать набор событий как аномалию, события должны произойти в одном и том же временном окне. например скажем, есть три источника данных, которые передают соответствующие события в темы Kafka, но по какой-то причине данные не синхронизируются. Таким образом, один из механизмов вывода извлекает последние записи из каждой темы kafka, но соответствующие события в извлеченных данных не принадлежат одному и тому же временному окну (скажем, 1 часу). Это приведет к неверным прогнозам из-за несинхронизированных данных.

Вопрос

Нам нужно выяснить, как мы можем убедиться, что данные из всех трех источников передаются по порядку, чтобы, когда механизм логического вывода запрашивает записи (скажем, последние 100 записей) из нескольких тем какфа, соответствующие записи в каждой теме принадлежали то же временное окно?

Вы задали довольно интересный вопрос. Возможно, эта статья приведет вас к какому-то решению.

dmkvl 01.06.2019 10:25
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
8
1
1 216
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Некоторые предложения -

  1. Обработка задержки на стороне производителя - Убедитесь, что все три производителя всегда синхронно отправляют данные в темы Kafka, используя batch.size и linger.ms. например. если для linger.ms установлено значение 1000, все сообщения будут отправляться в Kafka в течение 1 секунды.

  2. Обработка задержки на стороне потребителя - Принимая во внимание любой механизм потоковой передачи на стороне потребителя (будь то Kafka-stream, spark-stream, Flink), предоставляет функциональные возможности Windows для объединения/объединения потоковых данных на основе ключей с учетом функции окна с задержкой.

Проверьте это - окна Flink для справки, как выбрать правильный тип окна связь

Чтобы справиться с этим сценарием, источники данных должны предоставить потребителю некоторый механизм, позволяющий понять, что все соответствующие данные получены. Самое простое решение — опубликовать пакет из источника данных с идентификатором пакета (Guid) в той или иной форме. Затем потребители могут подождать, пока не появится идентификатор следующей партии, отмечающий конец предыдущей партии. Этот подход предполагает, что источники не будут пропускать пакет, иначе они будут постоянно смещены. Не существует алгоритма для обнаружения этого, но у вас могут быть некоторые поля в данных, которые показывают неоднородность и позволяют вам повторно выравнивать данные.

Более слабая версия этого подхода состоит в том, чтобы либо просто подождать x секунд и предположить, что все источники преуспевают в течение этого большого количества времени, либо посмотреть на некоторую форму меток времени (логические или настенные часы), чтобы определить, что источник перешел к следующему временному окну. неявно показывая завершение последнего окна.

Ответ принят как подходящий

Я бы предложил KSQL, который представляет собой потоковый механизм SQL, который позволяет обрабатывать данные в реальном времени с помощью Apache Kafka. Он также обеспечивает хорошую функциональность для оконного агрегирования и т. д.

Есть 3 способа определить Окна в KSQL:

hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are time windows, because they're defined by fixed durations they you specify. Session windows are dynamically sized based on incoming data and defined by periods of activity separated by gaps of inactivity.

В вашем контексте вы можете использовать KSQL для запроса и объединения интересующих тем с помощью Оконные соединения. Например,

SELECT t1.id, ...
  FROM topic_1 t1
  INNER JOIN topic_2 t2
    WITHIN 1 HOURS
    ON t1.id = t2.id;

Следующие рекомендации должны максимизировать успех синхронизации событий для проблемы обнаружения аномалий с использованием данных временных рядов.

  1. Используйте синхронизатор сетевого времени на всех узлах производителя/потребителя.
  2. Используйте контрольное сообщение от производителей каждые x единиц времени с фиксированным временем начала. Например: сообщения отправляются каждые две минуты в начале минуты.
  3. Создайте предикторы для задержки сообщения производителя. используйте сообщения сердцебиения, чтобы вычислить это.

С помощью этих примитивов мы должны иметь возможность выравнивать события временных рядов, учитывая отклонения во времени из-за сетевых задержек.

На стороне механизма логического вывода расширьте окна на уровне каждого производителя, чтобы синхронизировать события между производителями.

Спасибо за предложения. Ваше решение вполне разумно, но так как у меня ограниченное знание всей системы, я искал какие-то практические решения (инструменты, доступные для реализации указанной задачи) в дополнение к концептуальному решению.

sgarizvi 06.06.2019 09:58

Для синхронизации времени по сети используйте NTP. Это можно сделать при запуске узла или перезагрузке устройства. Сообщения Heartbeat можно публиковать в теме Kafka. Вам просто нужны ProducerId, TimeStamp, ArrivalTimeStamp. Наличие сообщения указывает на сердцебиение. См.: gerardnico.com/dit/kafka/timestamp для обсуждения извлечения временных меток.

vvg 08.06.2019 05:57

Предикторы задержки сообщений можно создавать с использованием того же стека машинного обучения, который вы используете для механизма логического вывода. Поскольку могут быть потеряны сообщения, вам необходимо рассмотреть модель оставшихся сообщений, такую ​​​​как пропорциональная опасность Кокса, чтобы обеспечить точность.

vvg 08.06.2019 06:05

Другие вопросы по теме