У меня есть приложение Весенний поток облаков, которое получает события от RabbitMQ, используя Связующий кролик. Мое приложение можно резюмировать следующим образом:
@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}
Проблема в том, что не похоже, что @Transactional работает с Spring Cloud Stream (или, по крайней мере, это мое впечатление), поскольку, если есть исключение при записи в MongoDB, событие, похоже, уже было подтверждено: ed в RabbitMQ, а операция не повторил.
Учитывая, что я хочу достичь в основном той же функциональности, что и при использовании @Transactional вокруг функции с весна-amqp:




Здесь есть несколько проблем.
@StreamListener на основе Reactor вызываются ровно один раз, просто чтобы настроить Flux, чтобы @Transactional для этого метода не имел смысла - сообщения затем проходят через поток, поэтому все, что касается отдельных сообщений, должно выполняться в контексте потока.Да, вам нужно будет использовать ручные подтверждения; предположительно по результату работы магазина mongodb. Вам, вероятно, потребуется использовать Flux<Message<Event>>, чтобы иметь доступ к заголовкам тегов канала и доставки.