Недавно я обновил версию дебезиума с 1.9.6.Final до 2.6.1.Final.
Я изменил database.server.name -> topic.prefix
в рамках миграции и создал публикации для плагина pgoutput
(ранее использовался wal2json
). Окончательная конфигурация:
connector.class=io.debezium.connector.postgresql.PostgresConnector,
database.user=some_user,
database.dbname=locs,
offset.storage=com.quext.cdc.storage.CustomOffsetBackingStore,
slot.name=quext_core,
slot.drop.on.stop=true,
tasks.max=1,
schema.include.list=locs,debezium_cdc,
heartbeat.interval.ms=1800000,
plugin.name=pgoutput,
database.port=5432,
slot.max.retries=10,
slot.retry.delay.ms=60000,
topic.prefix=search-engine-server-locs,
heartbeat.action.query=INSERT INTO debezium_cdc.heartbeat (id, time) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET time = NOW();,
database.hostname=***,
database.password=***,
name=quext-core-connector,
table.include.list=debezium_cdc.heartbeat,locs.space,
engineName=core
Мы используем Debezium в качестве библиотеки Java и размещаем ее в кластере kube. У нас также есть несколько баз данных внутри экземпляра RDS, поэтому для каждой БД у нас есть отдельный экземпляр Debezium с уникальными slot.name
, name
и topic.prefix
. Однако публикации имеют одно и то же имя в каждой БД (не уверен, имеет ли это значение). В качестве смещенного хранилища мы используем таблицу Postgresql.
Properties props = new Properties();
// ... set properties
engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
this.handleEvent(record);
}).build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
Когда я запускаю дебезиум в БД без слота репликации, он создает слот, записывает сообщения ниже и зависает
{time: 2024-04-17 18:15:37.849, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:309,trace: SearchEngineCDC,message: No replication slot 'quext_core' is present for plugin 'pgoutput' and database 'locs'}
{time: 2024-04-17 18:15:37.850, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:337,trace: SearchEngineCDC,message: Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]}
{time: 2024-04-17 18:15:37.926, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresConnectorTask:158,trace: SearchEngineCDC,message: Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='search-engine-server-locs'db='locs', lsn=LSN{1ABC/8070CE88}, txId=2180314030, timestamp=2024-04-17T17:57:53.171784Z, snapshot=TRUE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount = {}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]}
{time: 2024-04-17 18:15:38.026, level: WARN, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresTaskContext:101,trace: SearchEngineCDC,message: Connector has enabled automated replication slot removal upon restart (slot.drop.on.stop = true). This setting is not recommended for production environments, as a new replication slot will be created after a connector restart, resulting in missed data change events.}
{time: 2024-04-17 18:15:38.033, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:507,trace: SearchEngineCDC,message: Creating new replication slot 'quext_core' for plugin 'PGOUTPUT'}
{time: 2024-04-17 18:15:38.132, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:150,trace: SearchEngineCDC,message: Initializing PgOutput logical decoder publication}
{time: 2024-04-17 18:15:38.138, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:529,trace: SearchEngineCDC,message: Creating replication slot with command CREATE_REPLICATION_SLOT "quext_core" LOGICAL pgoutput}
Когда я останавливаю сервер Debezium, слот репликации postgresql по какой-то причине остается активным, что приводит к следующей проблеме: когда я перезапускаю сервер Debezium, он выдает исключение о том, что репликация уже существует.
Если я убью pid, который занимает слот репликации, с помощью pg_terminate_backend(pid)
, я окажусь в первом случае, когда Debezium зависает при создании слота репликации.
Пример pg_replication_slots
select
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
*
from
pg_replication_slots
order by
slot_name;
{
"replicationslotlag": "617 MB",
"confirmedlag": null,
"slot_name": "quext_core",
"plugin": "pgoutput",
"slot_type": "logical",
"datoid": 38711948,
"database": "locs",
"temporary": false,
"active": true,
"active_pid": 17073,
"xmin": null,
"catalog_xmin": "2178725846",
"restart_lsn": "1ABC/648AED10",
"confirmed_flush_lsn": null
}
Данные таблицы смещения
[
{
"id" : "fed74584-10b2-4c6c-85ef-9b4b87f208d7",
"engine_name" : "search_engine",
"offset_key" : "[\"quext-search-engine-connector\",{\"server\":\"search-engine-server-search_engine\"}]",
"offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29396158091376,\"txId\":2180193567,\"ts_usec\":1713374167445940,\"snapshot\":true}"
},
{
"id" : "ee9a8ca0-be9c-4413-9c1d-869c0da0ae00",
"engine_name" : "core",
"offset_key" : "[\"quext-core-connector\",{\"server\":\"search-engine-server-locs\"}]",
"offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29397573471232,\"txId\":2180419522,\"ts_usec\":1713378721496195,\"snapshot\":true}"
}
]
Некоторая информация о конфигурации
Postgresql — RDS (PostgreSQL 12.17 на aarch64-unknown-linux-gnu, скомпилирован gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-6), 64-разрядная версия)
Версия дебезиума - 2.6.1.Финальная
Плагин - pgoutput
Экземпляр Postgres настроен для репликации и назначены роли.
Публикации создаются для каждой базы данных с использованием CREATE PUBLICATION dbz_publication FOR ALL TABLES;
Я пытался
Перезапустите сервер Debezium (не уверен, что это корректное завершение работы с точки зрения клиента Debezium)
Уничтожьте слот репликации active_pid с помощью pg_terminate_backend(pid) и запустите Debezium.
Очистить таблицу смещений.
Клянусь, в начале дня он работал, но когда я включил несколько экземпляров Debezium для других БД (у нас есть один экземпляр с несколькими БД внутри, и для каждой БД мы запускаем экземпляр Debezium), он перестал работать. А теперь не работает ни с одним экземпляром Дебезиума.
Может быть, проблема в снимке? Но размер БД составляет 12 МБ.
УПД1: Я откатил версию Debezium и теперь она логируется
{time: 2024-04-18 13:10:22.995, level: WARN, PID: 1, thread: pool-5-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:265,trace: SearchEngineCDC,message: Cannot obtain valid replication slot 'quext_accounting' for plugin 'wal2json' and database 'acct_1' [during attempt 43 out of 900, concurrent tx probably blocks taking snapshot.}
Сегодня мы разобрались, в чем проблема: она была на стороне БД. Процесс создания слота репликации был заблокирован длительной транзакцией: слот показывался как созданный с активным идентификатором, но дальнейший процесс не выполнялся. После того, как мы выяснили это с помощью pg_stat_activity
и устранили блокировку процесса, создание слота репликации завершилось, и теперь оно работает как положено.
Для тех, кто может столкнуться с этой проблемой в будущем, попробуйте создать слот репликации вручную, используя pg_create_logical_replication_slot(...)
. Если не работает даже без Дебезиума - скорее всего что-то не так на стороне БД. Нашим вторым шагом было проверить, заблокирован ли процесс слота репликации.
select
pid,
usename,
pg_blocking_pids(pid) as blocked_by,
query as blocked_query
from
pg_stat_activity
where
cardinality(pg_blocking_pids(pid)) > 0
-- and pid = '...' you can filter by active_pid from pg_replication_slots view
После расследования мы обнаружили одну транзакцию, которая выполняется почти день и блокирует другие транзакции, которые блокируют другие транзакции,... создание слота репликации.
К сожалению, насколько мне известно, невозможно установить тайм-аут для активных и выполняемых транзакций. Но можно установить таймаут выполнения одного оператора и таймаут транзакции простоя (наша транзакция не простаивала). Также вы можете отслеживать длительные транзакции, используя следующий запрос
select
clock_timestamp() - xact_start,
*
from
pg_stat_activity
where
clock_timestamp() - xact_start > interval '1 minute' -- specify interval