Я использую коннектор источника kafka connect для потоковой передачи записи таблицы в темы Kafka. Я хочу преобразовать данные во время чтения, как показано ниже:
Если моя таблица выглядит следующим образом:
{"автор": "Филип К. Дик", "персонаж": "Палмер Элдрич"}
Я хочу преобразовать его в:
{"id":"123", "type":"test" , "timestamp":"1234567",{"author": "Филип К. Дик", "персонаж": "Палмер Элдрич"}}
в примерах, которые я видел в Kafka Connect Transformations, он преобразует все столбцы/записи в данной таблице. Но я хочу добавить/добавить несколько статических полей только один раз. Может кто-нибудь помочь мне с некоторым рабочим примером или ссылкой, где искать.
я пытался использовать
"преобразует": "insertStaticField1,insertStaticField2"
но добавляет статическое поле ко всем столбцам таблицы
Непонятно, что вы имеете в виду под "только один раз". Каждая запись – это уникальное событие. Каждое событие передается в преобразование. Преобразования всегда будут применяться ко всем записям; они не предназначены для выделения конкретных записей.
Преобразование InsertField
не создает вложенных полей, а только добавляет поля к самому верхнему объекту (здесь они не называются столбцами).
Чтобы «обернуть» запись в новый объект, например, data
в {"id":"123", "type":"test" , "timestamp":"1234567", "data": {"author": "Philip K. Dick", "character": "Palmer Eldritch"}}
, вам понадобится HoistField
, но это относится только к отдельным полям, а не к нескольким.
Поэтому, если вам нужна логика такого типа, используйте Kafka Streams или ksqlDB, например, если вы не хотите создавать собственное преобразование.
Как я уже сказал, каждое событие уникально. Преобразования одного сообщения, как их называют, не объединяют «несколько записей». Кроме того, в вашем примере недействительный JSON
Я использую Avro, а не json, и я не могу публиковать фактические данные, поэтому я просто создал образец, имитирующий формат данных. Хотя я понял твою точку зрения. Спасибо за вашу помощь.
У меня есть один вопрос относительно пользовательского преобразования. Если я хочу вставить новое поле с динамическим значением (например, UUID), как я могу этого добиться? Я новичок в Kafka connect и не знаю, как работает пользовательское преобразование.
Это классы Java docs.confluent.io/platform/current/connect/transforms/…
Привет, спасибо за ваш ответ. Под одним разом я подразумеваю, что когда я читаю данные из таблицы, они могут иметь несколько записей, как показано ниже: Персонаж автора 1 Филип К. Дик Палмер Элдрич 2 Джон xyz, и я хочу преобразовать его в: {"id": "123", "type ":"тест" , "отметка времени":"1234567",{"автор": "Филип К. Дик", "персонаж": "Палмер Элдрич"},{"автор": "Джон", "персонаж": " хиз"}}