При реализации Outbox Pattern и CDC с Postgres, Kafka и Debezium Connect в док-контейнерах я столкнулся со странной ошибкой. Всякий раз, когда я пытаюсь использовать преобразования EventRouter в своем соединителе, я получаю следующую ошибку, когда таблица исходящих сообщений получает новую запись, и сообщение должно быть отправлено в Kafka:
2022-11-25 21:01:27 2022-11-25 20:01:27,304 INFO Postgres|coordinator|streaming First LSN 'LSN{0/184D1E8}' received [io.debezium.connector.postgresql.connection.WalPositionLocator]
2022-11-25 21:01:27 2022-11-25 20:01:27,307 INFO Postgres|coordinator|streaming WAL resume position 'LSN{0/184D1E8}' discovered [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
2022-11-25 21:01:27 2022-11-25 20:01:27,310 INFO Postgres|coordinator|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO Postgres|coordinator|streaming Requested thread factory for connector PostgresConnector, id = coordinator named = keep-alive [io.debezium.util.Threads]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO Postgres|coordinator|streaming Creating thread debezium-postgresconnector-coordinator-keep-alive [io.debezium.util.Threads]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO Postgres|coordinator|streaming Processing messages [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
2022-11-25 21:01:27 2022-11-25 20:01:27,352 INFO Postgres|coordinator|streaming Message with LSN 'LSN{0/184D1E8}' arrived, switching off the filtering [io.debezium.connector.postgresql.connection.WalPositionLocator]
2022-11-25 21:02:03 2022-11-25 20:02:03,508 INFO || 2 records sent during previous 00:03:51.693, last recorded offset of {server=coordinator} partition is {transaction_id=null, lsn_proc=25679872, lsn_commit=25679448, lsn=25679872, txId=742, ts_usec=1669406522764811} [io.debezium.connector.common.BaseSourceTask]
2022-11-25 21:02:03 2022-11-25 20:02:03,528 ERROR || WorkerSourceTask{id=apimessage-outbox-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2022-11-25 21:02:03 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:386)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
2022-11-25 21:02:03 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-11-25 21:02:03 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-11-25 21:02:03 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-11-25 21:02:03 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-11-25 21:02:03 at java.base/java.lang.Thread.run(Thread.java:829)
2022-11-25 21:02:03 Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to find payload field payload in event
2022-11-25 21:02:03 at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:132)
2022-11-25 21:02:03 at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
2022-11-25 21:02:03 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
2022-11-25 21:02:03 ... 12 more
2022-11-25 21:02:03 2022-11-25 20:02:03,533 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask]
2022-11-25 21:02:03 2022-11-25 20:02:03,538 INFO Postgres|coordinator|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:02:03 2022-11-25 20:02:03,538 INFO Postgres|coordinator|streaming Finished streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
2022-11-25 21:02:03 2022-11-25 20:02:03,539 INFO Postgres|coordinator|streaming Connected metrics set to 'false' [io.debezium.pipeline.ChangeEventSourceCoordinator]
2022-11-25 21:02:03 2022-11-25 20:02:03,543 INFO || Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:02:03 2022-11-25 20:02:03,544 INFO || [Producer clientId=connector-producer-apimessage-outbox-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO || App info kafka.producer for connector-producer-apimessage-outbox-connector-0 unregistered [org.apache.kafka.common.utils.AppInfoParser]
Мой призыв к созданию коннектора:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "apimessage-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "coordinator",
"slot.name": "coordinator",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "admin",
"database.dbname": "coordinator",
"database.server.name": "localhost",
"tombstones.on.delete": "false",
"table.whitelist": "coordinator.outbox_event",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"}
}'
Эта ошибка возникает не только тогда, когда я удаляю эти строки из сценария создания соединителя, но тогда я не могу настроить отправленное сообщение:
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
Не уверен, что вы все еще сталкиваетесь с этой проблемой. Я столкнулся с этим сегодня, и через некоторое время мне удалось понять, что пошло не так.
После некоторой отладки оказалось, что коннектор Kafka не отфильтровывал какие-либо таблицы, поэтому исходящие сообщения пытались публиковать события для таблиц, которые не имели структуру исходящих сообщений (без столбца payload
).
Это было связано с тем, что в новых версиях kafka-connect свойство from table.whitelist
было обновлено на table.include.list
.
Это изменение привело к тому, что исходящие сообщения с вышеуказанной конфигурацией пытались опубликовать любые изменения, включенные в журнал корзины, а не только изменения, включенные в вашу таблицу coordinator.outbox_event
.
Ага! Похоже, это было! Спасибо за помощь @Fran Sanchez!
Не имея возможности исправить это каким-либо образом при использовании контейнеров версий 2.0 или 2.1: debezium/connect, debezium/postgres, debezium/kafka, я, наконец, заработал, вернувшись к версии 1.9. Я бы не назвал это решением, поэтому пишу в комментариях, а не в ответах.