Ошибка при попытке использовать io.debezium.transforms.outbox.EventRouter в контейнере Debezium Connect CDC

При реализации Outbox Pattern и CDC с Postgres, Kafka и Debezium Connect в док-контейнерах я столкнулся со странной ошибкой. Всякий раз, когда я пытаюсь использовать преобразования EventRouter в своем соединителе, я получаю следующую ошибку, когда таблица исходящих сообщений получает новую запись, и сообщение должно быть отправлено в Kafka:

2022-11-25 21:01:27 2022-11-25 20:01:27,304 INFO   Postgres|coordinator|streaming  First LSN 'LSN{0/184D1E8}' received   [io.debezium.connector.postgresql.connection.WalPositionLocator]
2022-11-25 21:01:27 2022-11-25 20:01:27,307 INFO   Postgres|coordinator|streaming  WAL resume position 'LSN{0/184D1E8}' discovered   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
2022-11-25 21:01:27 2022-11-25 20:01:27,310 INFO   Postgres|coordinator|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO   Postgres|coordinator|streaming  Requested thread factory for connector PostgresConnector, id = coordinator named = keep-alive   [io.debezium.util.Threads]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO   Postgres|coordinator|streaming  Creating thread debezium-postgresconnector-coordinator-keep-alive   [io.debezium.util.Threads]
2022-11-25 21:01:27 2022-11-25 20:01:27,351 INFO   Postgres|coordinator|streaming  Processing messages   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
2022-11-25 21:01:27 2022-11-25 20:01:27,352 INFO   Postgres|coordinator|streaming  Message with LSN 'LSN{0/184D1E8}' arrived, switching off the filtering   [io.debezium.connector.postgresql.connection.WalPositionLocator]
2022-11-25 21:02:03 2022-11-25 20:02:03,508 INFO   ||  2 records sent during previous 00:03:51.693, last recorded offset of {server=coordinator} partition is {transaction_id=null, lsn_proc=25679872, lsn_commit=25679448, lsn=25679872, txId=742, ts_usec=1669406522764811}   [io.debezium.connector.common.BaseSourceTask]
2022-11-25 21:02:03 2022-11-25 20:02:03,528 ERROR  ||  WorkerSourceTask{id=apimessage-outbox-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
2022-11-25 21:02:03 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:386)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
2022-11-25 21:02:03     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-11-25 21:02:03     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-11-25 21:02:03     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-11-25 21:02:03     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-11-25 21:02:03     at java.base/java.lang.Thread.run(Thread.java:829)
2022-11-25 21:02:03 Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to find payload field payload in event
2022-11-25 21:02:03     at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:132)
2022-11-25 21:02:03     at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
2022-11-25 21:02:03     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
2022-11-25 21:02:03     ... 12 more
2022-11-25 21:02:03 2022-11-25 20:02:03,533 INFO   ||  Stopping down connector   [io.debezium.connector.common.BaseSourceTask]
2022-11-25 21:02:03 2022-11-25 20:02:03,538 INFO   Postgres|coordinator|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:02:03 2022-11-25 20:02:03,538 INFO   Postgres|coordinator|streaming  Finished streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
2022-11-25 21:02:03 2022-11-25 20:02:03,539 INFO   Postgres|coordinator|streaming  Connected metrics set to 'false'   [io.debezium.pipeline.ChangeEventSourceCoordinator]
2022-11-25 21:02:03 2022-11-25 20:02:03,543 INFO   ||  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
2022-11-25 21:02:03 2022-11-25 20:02:03,544 INFO   ||  [Producer clientId=connector-producer-apimessage-outbox-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO   ||  Metrics scheduler closed   [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO   ||  Closing reporter org.apache.kafka.common.metrics.JmxReporter   [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO   ||  Metrics reporters closed   [org.apache.kafka.common.metrics.Metrics]
2022-11-25 21:02:03 2022-11-25 20:02:03,548 INFO   ||  App info kafka.producer for connector-producer-apimessage-outbox-connector-0 unregistered   [org.apache.kafka.common.utils.AppInfoParser]

Мой призыв к созданию коннектора:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "apimessage-outbox-connector",
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "topic.prefix": "coordinator",
  "slot.name": "coordinator",
  "tasks.max": "1",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "admin",
  "database.dbname": "coordinator",
  "database.server.name": "localhost",
  "tombstones.on.delete": "false",
  "table.whitelist": "coordinator.outbox_event",
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"}
}'

Эта ошибка возникает не только тогда, когда я удаляю эти строки из сценария создания соединителя, но тогда я не могу настроить отправленное сообщение:

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"

Не имея возможности исправить это каким-либо образом при использовании контейнеров версий 2.0 или 2.1: debezium/connect, debezium/postgres, debezium/kafka, я, наконец, заработал, вернувшись к версии 1.9. Я бы не назвал это решением, поэтому пишу в комментариях, а не в ответах.

Jacek Grzegorczyk 30.11.2022 11:42
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
1
323
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Не уверен, что вы все еще сталкиваетесь с этой проблемой. Я столкнулся с этим сегодня, и через некоторое время мне удалось понять, что пошло не так.

После некоторой отладки оказалось, что коннектор Kafka не отфильтровывал какие-либо таблицы, поэтому исходящие сообщения пытались публиковать события для таблиц, которые не имели структуру исходящих сообщений (без столбца payload).

Это было связано с тем, что в новых версиях kafka-connect свойство from table.whitelist было обновлено на table.include.list.

Это изменение привело к тому, что исходящие сообщения с вышеуказанной конфигурацией пытались опубликовать любые изменения, включенные в журнал корзины, а не только изменения, включенные в вашу таблицу coordinator.outbox_event.

Источник: https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html#advanced-parameters

Ага! Похоже, это было! Спасибо за помощь @Fran Sanchez!

Jacek Grzegorczyk 20.03.2023 09:36

Другие вопросы по теме