Коннектор исходного кода Debezium, добавьте поле в объект after

Я использую исходный соединитель Debezium PostgreSQL для CDC в Kafka. Ниже приведена желаемая схема сообщения, которую я хочу отправить в Кафку. Самое примечательное — поле after.source.

Поле after.source — это поле со статическим, никогда не меняющимся полем, которого нет в исходной базе данных. Я не могу редактировать или создавать представления в исходной базе данных из-за соглашения о поддержке поставщика.

{
    "before": null,
    "after": {
      "rid": "3b99c447-65a8-4d6b-bbff-2c33b7944696",
      "cust": 75862,
      "loc": 916719,
      "meter": "A90OC5385040",
      "cosum": "2.06",
      "cosdt": 1673330400000000,
      "costy": "I",
      "source": "C"
    },
    "source": {
      "version": "2.4.2.Final",
      "connector": "postgresql",
      "name": "mmv2_pgami",
      "ts_ms": 1711944632077,
      "snapshot": "false",
      "db": "Pgami_db",
      "sequence": "[null,\"23516504\"]",
      "schema": "public",
      "table": "mreads",
      "txId": 574,
      "lsn": 23516504,
      "xmin": null
    },
    "op": "c",
    "ts_ms": 1711944632426,
    "transaction": null
  }

Могу ли я добавить поле source, встроенное в объект after из соединителя источника Debezium (как показано в примере выше)?

Ниже показано, что я пробовал, но в корень добавляется новое поле с именем «after.source», что неверно.

{
    "name": "postgres_connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // Other configuration properties...

        // Add the following configuration for the AddFields transformation
        "transforms": "addSourceField",
        "transforms.addSourceField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSourceField.static.field": "after.source",
        "transforms.addSourceField.static.value": "C"
    }
}

Последний раз, когда я проверял, создание вложенных полей невозможно с помощью любого встроенного преобразования, поскольку они не знают структуру данных, а точки по-прежнему действительны в полях json.

OneCricketeer 01.04.2024 13:11
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
1
342
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

@OneCricketeer прав, поэтому, позаимствовав его ответ и добавив к нему, по сути, я в конечном итоге использовал ExtractNewRecordState, чтобы свести правило только к объекту after. После этого я использую преобразование переименования:

{
    "name": "postgres_connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // Other configuration properties...

        // Add the following configuration for the AddFields transformation
        "transforms": "unwrap,addSourceField",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSourceField.static.field":"source",
        "transforms.addSourceField.static.value":"C"
    }
}

Вот как я добавляю новый столбец в целевую таблицу.

{
   "name":"postgres-sink-connector-customers",
   "config":{
      "connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
      "topics":"dbserver1.inventory.customers",
      "connection.url":"jdbc:postgresql://...",
      "connection.username":"username",
      "connection.password":"password",
      ...
      "transforms":"unwrap,addSourceField",
      "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones":"true",
      "transforms.unwrap.delete.handling.mode":"none",
      "transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.addSourceField.static.field":"source",
      "transforms.addSourceField.static.value":"C"
   }
}

Другие вопросы по теме