Spring Cloud Stream со связывателем RabbitMQ, как применить @Transactional?

У меня есть приложение Весенний поток облаков, которое получает события от RabbitMQ, используя Связующий кролик. Мое приложение можно резюмировать следующим образом:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

Проблема в том, что не похоже, что @Transactional работает с Spring Cloud Stream (или, по крайней мере, это мое впечатление), поскольку, если есть исключение при записи в MongoDB, событие, похоже, уже было подтверждено: ed в RabbitMQ, а операция не повторил.

Учитывая, что я хочу достичь в основном той же функциональности, что и при использовании @Transactional вокруг функции с весна-amqp:

  1. Должен ли я вручную подтверждать сообщения RabbitMQ при использовании Spring? Cloud Stream с Rabbit Binder?
  2. Если да, то как я могу этого добиться?
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
459
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Здесь есть несколько проблем.

  1. Для подтверждения сообщений транзакции не требуются
  2. Методы @StreamListener на основе Reactor вызываются ровно один раз, просто чтобы настроить Flux, чтобы @Transactional для этого метода не имел смысла - сообщения затем проходят через поток, поэтому все, что касается отдельных сообщений, должно выполняться в контексте потока.
  3. Spring Транзакции привязаны к потоку - Reactor не блокирует; сообщение будет подтверждено при первой передаче обслуживания.

Да, вам нужно будет использовать ручные подтверждения; предположительно по результату работы магазина mongodb. Вам, вероятно, потребуется использовать Flux<Message<Event>>, чтобы иметь доступ к заголовкам тегов канала и доставки.

Другие вопросы по теме