Kafka Connect Custom Transformation: хотите преобразовать данные, добавляя настраиваемые поля только один раз (не во все столбцы)

Я использую коннектор источника kafka connect для потоковой передачи записи таблицы в темы Kafka. Я хочу преобразовать данные во время чтения, как показано ниже:

Если моя таблица выглядит следующим образом:

{"автор": "Филип К. Дик", "персонаж": "Палмер Элдрич"}

Я хочу преобразовать его в:

{"id":"123", "type":"test" , "timestamp":"1234567",{"author": "Филип К. Дик", "персонаж": "Палмер Элдрич"}}

в примерах, которые я видел в Kafka Connect Transformations, он преобразует все столбцы/записи в данной таблице. Но я хочу добавить/добавить несколько статических полей только один раз. Может кто-нибудь помочь мне с некоторым рабочим примером или ссылкой, где искать.

я пытался использовать

"преобразует": "insertStaticField1,insertStaticField2"

но добавляет статическое поле ко всем столбцам таблицы

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
117
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Непонятно, что вы имеете в виду под "только один раз". Каждая запись – это уникальное событие. Каждое событие передается в преобразование. Преобразования всегда будут применяться ко всем записям; они не предназначены для выделения конкретных записей.

Преобразование InsertField не создает вложенных полей, а только добавляет поля к самому верхнему объекту (здесь они не называются столбцами).

Чтобы «обернуть» запись в новый объект, например, data в {"id":"123", "type":"test" , "timestamp":"1234567", "data": {"author": "Philip K. Dick", "character": "Palmer Eldritch"}}, вам понадобится HoistField, но это относится только к отдельным полям, а не к нескольким.

Поэтому, если вам нужна логика такого типа, используйте Kafka Streams или ksqlDB, например, если вы не хотите создавать собственное преобразование.

Привет, спасибо за ваш ответ. Под одним разом я подразумеваю, что когда я читаю данные из таблицы, они могут иметь несколько записей, как показано ниже: Персонаж автора 1 Филип К. Дик Палмер Элдрич 2 Джон xyz, и я хочу преобразовать его в: {"id": "123", "type ":"тест" , "отметка времени":"1234567",{"автор": "Филип К. Дик", "персонаж": "Палмер Элдрич"},{"автор": "Джон", "персонаж": " хиз"}}

Priyanka 08.11.2022 00:05

Как я уже сказал, каждое событие уникально. Преобразования одного сообщения, как их называют, не объединяют «несколько записей». Кроме того, в вашем примере недействительный JSON

OneCricketeer 08.11.2022 14:33

Я использую Avro, а не json, и я не могу публиковать фактические данные, поэтому я просто создал образец, имитирующий формат данных. Хотя я понял твою точку зрения. Спасибо за вашу помощь.

Priyanka 09.11.2022 02:40

У меня есть один вопрос относительно пользовательского преобразования. Если я хочу вставить новое поле с динамическим значением (например, UUID), как я могу этого добиться? Я новичок в Kafka connect и не знаю, как работает пользовательское преобразование.

Priyanka 09.11.2022 04:39

Это классы Java docs.confluent.io/platform/current/connect/transforms/…

OneCricketeer 09.11.2022 16:46

Другие вопросы по теме