Обычный способ реализации шаблон исходящих сообщений состоит в том, чтобы хранить полезную нагрузку сообщения в таблице исходящих сообщений и иметь отдельный процесс (Ретрансляция сообщений) для запроса ожидающих сообщений и публикации их в брокере сообщений, в моем случае Kafka.
Состояние таблицы исходящих сообщений может быть таким, как показано ниже.
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
My Message Relay — это приложение Spring Boot/Cloud Stream, которое периодически (@Scheduled) ищет ожидающие записи, публикует их в Kafka и обновляет запись до состояния PROCESSED.
Первая проблема: если я запущу несколько экземпляров Message Relay, все они будут запрашивать таблицу «Исходящие», и, возможно, в какой-то момент разные экземпляры получат одни и те же реестры PENDING для публикации в Kafka, создавая дублирующиеся сообщения. Как я могу предотвратить это?
Другая ситуация: предполагается только одна ретрансляция сообщений. Он получает одну запись PENDING, публикует ее в теме, но вылетает перед обновлением записи до PROCESSED. Когда он запустится снова, он найдет ту же запись PENDING и опубликует ее снова. Есть ли способ избежать этого дублирования или единственный способ - разработать идемпотентную систему.
БД это MariaDB
Почему вы не используете Kafka-Connect для контроля отправленных событий? debezium.io/блог/2019/02/19/…
как часто должен запускаться планировщик в подобных случаях, он должен работать почти в режиме реального времени?

Чтобы предотвратить первую проблему, вы должны использовать блокировку базы данных.
SELECT * FROM outbox WHERE id = 1 FOR UPDATE
Это предотвратит доступ других процессов к той же строке.
Вторую проблему вы не можете решить, потому что у вас нет распределенной транзакции с Kafka.
Таким образом, один из способов может заключаться в том, чтобы установить запись в состояние, подобное ОБРАБОТКЕ, прежде чем отправлять ее в Кафку, и в случае сбоя приложения вы должны проверить, есть ли записи в состоянии ОБРАБОТКА, и выполнить некоторую очистку, чтобы узнать, были ли они уже отправлены в Кафку. .
Но лучшим решением было бы иметь идемпотентную систему, которая может обрабатывать дубликаты.
Потребитель может вести журнал сообщений и проверять, пришло ли то же самое сообщение ранее по messageId (в случаях, когда оно не может быть идемпотентным).
Вы можете использовать debezium (https://debezium.io/), чтобы прочитать бинарный журнал SQL-сервера и записать события в Kafka. Это решит оба ваших вопроса.
Для первой проблемы вы можете использовать Библиотека ShedLock. Это гарантирует, что в любое время только один экземпляр вашей службы будет выполнять запланированную задачу.
Для 2-й проблемы да, вам придется разработать идемпотентного потребителя. Вы можете сделать это, передав идентификатор сообщения потребителю и поддерживая таблицу, чтобы проверить, обработано ли уже это сообщение с идентификатором сообщения, просто игнорируйте его.
В какой базе данных вы храните исходящие?