Нам необходимо выполнить некоторую предварительную обработку каждого сообщения (расшифровать / повторно зашифровать с разными ключами) из одной темы в другую.
Я искал возможность использовать Kafka Connect, поскольку он предоставляет много хороших вещей из коробки (управление конфигурациями, хранение смещения, обработка ошибок и т. д.).
Но также кажется, что я бы прекратил реализацию SourceConnector и SinkConnector, чтобы просто перемещать данные между двумя темами, и ни один из этих интерфейсов не предназначен для работы с Topic A -> (Connector) -> Topic B. Это правильный подход? Должен ли я просто использовать только SinkConnector, а мой SourceTask.put() выполнять всю логику для записи в Kafka?
Другими вариантами являются KafkaConsumer/Producer и / или Streams, но тогда им потребуются собственные экземпляры для запуска логики, а не обработка ошибок повторной попытки смещения.

provides a lot of good things out of the box (config management, offset storage, error handling, etc.)
Управление конфигурацией не должно быть сложнее, чем повторное развертывание приложения, но это зависит от любого контроля версий или конвейеров CI / CD, которые у вас могут быть, а могут и не быть.
Kafka Producer / Consumer и Streams предлагают управление смещением, вам просто нужно настроить его, чтобы он делал что угодно, кроме значений по умолчанию.
Обработка ошибок довольно хорошо документирована, не стоит просто запускать и забывать, если вы заботитесь об обнаружении ошибок. Само соединение перестанет потреблять и производить при возникновении критических ошибок, не повторяя и не пропуская сообщения.
Neither of those interfaces are meant to do
Topic A -> (Connector) -> Topic B"
Вы видели Confluent Replicator (лицензионный продукт)? Вот является Kafka Соединяет две темы.
Еще, вы видели MirrorMaker? Это пара производитель-потребитель, которая обычно используется для репликации данных между отдельными кластерами, но может использоваться с одними и теми же настройками источника и назначения. Вам просто нужно убедиться, что вы не создаете петлю обратной связи. Вам нужно будет применить к этому «настраиваемую логику» (и изменить название темы), с тем, что такое называется классом обработчика, который помещается в ваш путь к классам Kafka
bin/kafka-mirror-maker.sh
...
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
Документация Confluent MirrorMaker
Документация Kafka MirrorMaker
Вам ничего не мешает реализовать Connect API, и, возможно, им будет проще управлять, чем приложением Kafka Streams без внешнего диспетчера кластеров. Кроме того, поскольку Connect - это библиотека Java, теоретически вы можете использовать библиотеку Streams внутри нее.
Добро пожаловать - Также github.com/gwenshap/kafka-examples/tree/master/…
@ cricket-007 Спасибо, мы тоже подумывали о создании зеркал, я не знал, что у нас может быть
message.hanlderдля обработки нестандартных изображений.