Исходящие шаблоны — как мы можем предотвратить создание повторяющихся сообщений процессом ретрансляции сообщений?

Обычный способ реализации шаблон исходящих сообщений состоит в том, чтобы хранить полезную нагрузку сообщения в таблице исходящих сообщений и иметь отдельный процесс (Ретрансляция сообщений) для запроса ожидающих сообщений и публикации их в брокере сообщений, в моем случае Kafka.

Состояние таблицы исходящих сообщений может быть таким, как показано ниже.

 OUTBOX TABLE
 ---------------------------------
|ID | STATE     | TOPIC | PAYLOAD |
 ---------------------------------
| 1 | PROCESSED | user            |
| 2 | PENDING   | user            |
| 3 | PENDING   | billing         |
----------------------------------

My Message Relay — это приложение Spring Boot/Cloud Stream, которое периодически (@Scheduled) ищет ожидающие записи, публикует их в Kafka и обновляет запись до состояния PROCESSED.

Первая проблема: если я запущу несколько экземпляров Message Relay, все они будут запрашивать таблицу «Исходящие», и, возможно, в какой-то момент разные экземпляры получат одни и те же реестры PENDING для публикации в Kafka, создавая дублирующиеся сообщения. Как я могу предотвратить это?

Другая ситуация: предполагается только одна ретрансляция сообщений. Он получает одну запись PENDING, публикует ее в теме, но вылетает перед обновлением записи до PROCESSED. Когда он запустится снова, он найдет ту же запись PENDING и опубликует ее снова. Есть ли способ избежать этого дублирования или единственный способ - разработать идемпотентную систему.

В какой базе данных вы храните исходящие?

Simon Martinelli 11.06.2019 13:47

БД это MariaDB

codependent 11.06.2019 13:48

Почему вы не используете Kafka-Connect для контроля отправленных событий? debezium.io/блог/2019/02/19/…

rpereira15 16.06.2019 18:56

как часто должен запускаться планировщик в подобных случаях, он должен работать почти в режиме реального времени?

Vikash 21.09.2020 07:58
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
5
4
1 701
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Чтобы предотвратить первую проблему, вы должны использовать блокировку базы данных.

SELECT * FROM outbox WHERE id = 1 FOR UPDATE

Это предотвратит доступ других процессов к той же строке.

Вторую проблему вы не можете решить, потому что у вас нет распределенной транзакции с Kafka.

Таким образом, один из способов может заключаться в том, чтобы установить запись в состояние, подобное ОБРАБОТКЕ, прежде чем отправлять ее в Кафку, и в случае сбоя приложения вы должны проверить, есть ли записи в состоянии ОБРАБОТКА, и выполнить некоторую очистку, чтобы узнать, были ли они уже отправлены в Кафку. .

Но лучшим решением было бы иметь идемпотентную систему, которая может обрабатывать дубликаты.

Потребитель может вести журнал сообщений и проверять, пришло ли то же самое сообщение ранее по messageId (в случаях, когда оно не может быть идемпотентным).

Vikash 21.09.2020 07:57

Вы можете использовать debezium (https://debezium.io/), чтобы прочитать бинарный журнал SQL-сервера и записать события в Kafka. Это решит оба ваших вопроса.

Для первой проблемы вы можете использовать Библиотека ShedLock. Это гарантирует, что в любое время только один экземпляр вашей службы будет выполнять запланированную задачу.

Для 2-й проблемы да, вам придется разработать идемпотентного потребителя. Вы можете сделать это, передав идентификатор сообщения потребителю и поддерживая таблицу, чтобы проверить, обработано ли уже это сообщение с идентификатором сообщения, просто игнорируйте его.

Другие вопросы по теме