Я использую исходный соединитель Debezium PostgreSQL для CDC в Kafka. Ниже приведена желаемая схема сообщения, которую я хочу отправить в Кафку. Самое примечательное — поле after.source.
Поле after.source — это поле со статическим, никогда не меняющимся полем, которого нет в исходной базе данных. Я не могу редактировать или создавать представления в исходной базе данных из-за соглашения о поддержке поставщика.
{
"before": null,
"after": {
"rid": "3b99c447-65a8-4d6b-bbff-2c33b7944696",
"cust": 75862,
"loc": 916719,
"meter": "A90OC5385040",
"cosum": "2.06",
"cosdt": 1673330400000000,
"costy": "I",
"source": "C"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "mmv2_pgami",
"ts_ms": 1711944632077,
"snapshot": "false",
"db": "Pgami_db",
"sequence": "[null,\"23516504\"]",
"schema": "public",
"table": "mreads",
"txId": 574,
"lsn": 23516504,
"xmin": null
},
"op": "c",
"ts_ms": 1711944632426,
"transaction": null
}
Могу ли я добавить поле source, встроенное в объект after из соединителя источника Debezium (как показано в примере выше)?
Ниже показано, что я пробовал, но в корень добавляется новое поле с именем «after.source», что неверно.
{
"name": "postgres_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Other configuration properties...
// Add the following configuration for the AddFields transformation
"transforms": "addSourceField",
"transforms.addSourceField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSourceField.static.field": "after.source",
"transforms.addSourceField.static.value": "C"
}
}

@OneCricketeer прав, поэтому, позаимствовав его ответ и добавив к нему, по сути, я в конечном итоге использовал ExtractNewRecordState, чтобы свести правило только к объекту after. После этого я использую преобразование переименования:
{
"name": "postgres_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Other configuration properties...
// Add the following configuration for the AddFields transformation
"transforms": "unwrap,addSourceField",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSourceField.static.field":"source",
"transforms.addSourceField.static.value":"C"
}
}
Вот как я добавляю новый столбец в целевую таблицу.
{
"name":"postgres-sink-connector-customers",
"config":{
"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
"topics":"dbserver1.inventory.customers",
"connection.url":"jdbc:postgresql://...",
"connection.username":"username",
"connection.password":"password",
...
"transforms":"unwrap,addSourceField",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"none",
"transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSourceField.static.field":"source",
"transforms.addSourceField.static.value":"C"
}
}
Последний раз, когда я проверял, создание вложенных полей невозможно с помощью любого встроенного преобразования, поскольку они не знают структуру данных, а точки по-прежнему действительны в полях json.