Предварительная обработка сообщений (тема - тема) - API Kafka Connect против потоков против потребителя Kafka?

Нам необходимо выполнить некоторую предварительную обработку каждого сообщения (расшифровать / повторно зашифровать с разными ключами) из одной темы в другую.

Я искал возможность использовать Kafka Connect, поскольку он предоставляет много хороших вещей из коробки (управление конфигурациями, хранение смещения, обработка ошибок и т. д.).

Но также кажется, что я бы прекратил реализацию SourceConnector и SinkConnector, чтобы просто перемещать данные между двумя темами, и ни один из этих интерфейсов не предназначен для работы с Topic A -> (Connector) -> Topic B. Это правильный подход? Должен ли я просто использовать только SinkConnector, а мой SourceTask.put() выполнять всю логику для записи в Kafka?

Другими вариантами являются KafkaConsumer/Producer и / или Streams, но тогда им потребуются собственные экземпляры для запуска логики, а не обработка ошибок повторной попытки смещения.

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
364
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

provides a lot of good things out of the box (config management, offset storage, error handling, etc.)

Управление конфигурацией не должно быть сложнее, чем повторное развертывание приложения, но это зависит от любого контроля версий или конвейеров CI / CD, которые у вас могут быть, а могут и не быть.

Kafka Producer / Consumer и Streams предлагают управление смещением, вам просто нужно настроить его, чтобы он делал что угодно, кроме значений по умолчанию.

Обработка ошибок довольно хорошо документирована, не стоит просто запускать и забывать, если вы заботитесь об обнаружении ошибок. Само соединение перестанет потреблять и производить при возникновении критических ошибок, не повторяя и не пропуская сообщения.

Neither of those interfaces are meant to do Topic A -> (Connector) -> Topic B"

Вы видели Confluent Replicator (лицензионный продукт)? Вот является Kafka Соединяет две темы.

Еще, вы видели MirrorMaker? Это пара производитель-потребитель, которая обычно используется для репликации данных между отдельными кластерами, но может использоваться с одними и теми же настройками источника и назначения. Вам просто нужно убедиться, что вы не создаете петлю обратной связи. Вам нужно будет применить к этому «настраиваемую логику» (и изменить название темы), с тем, что такое называется классом обработчика, который помещается в ваш путь к классам Kafka

bin/kafka-mirror-maker.sh

...

--message.handler <String: A custom      Message handler which will process
  message handler of type                  every record in-between consumer and
  MirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom message
  Arguments passed to message handler      handler for mirror maker.
  constructor.>

Документация Confluent MirrorMaker
Документация Kafka MirrorMaker


Вам ничего не мешает реализовать Connect API, и, возможно, им будет проще управлять, чем приложением Kafka Streams без внешнего диспетчера кластеров. Кроме того, поскольку Connect - это библиотека Java, теоретически вы можете использовать библиотеку Streams внутри нее.

@ cricket-007 Спасибо, мы тоже подумывали о создании зеркал, я не знал, что у нас может быть message.hanlder для обработки нестандартных изображений.

maverick 24.04.2018 17:16

Добро пожаловать - Также github.com/gwenshap/kafka-examples/tree/master/…

OneCricketeer 24.04.2018 18:30

Другие вопросы по теме