Я использую Reactive Spring Cloud Stream, и у меня проблемы с созданием StreamListener без вывода. Следующий код работает до тех пор, пока не получены искаженные сообщения. Когда получено искаженное сообщение, поток закрывается.
@StreamListener
public void handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}
Если я правильно понимаю, предпочтительнее позволить фреймворку подписаться на поток вместо того, чтобы подписываться на него вручную. Это не проблема, когда у слушателя есть вывод, потому что я могу просто вернуть поток вот так:
@StreamListener
@Output(MessagingConfig.OUTPUT)
public Flux<String> handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
return payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave));
}
Кажется, что фреймворк обрабатывает плохие сообщения таким образом, что не закрывает поток при его возврате. Есть ли способ позволить фреймворку обрабатывать поток, когда слушатель не указывает вывод?




Рассмотрите возможность перехода на использование модели программирования Spring Cloud Function (SCF), которая у нас есть недавно принятый. В принципе, пока у вас последняя кодовая база (2.1.0.RC4 - последняя, а RELEASE - через несколько дней), все в порядке. Вот пример вашего кода с использованием модели программирования SCF:
@SpringBootApplication
@EnableBinding(Sink.class)
public class SampleReactiveConsumer {
public static void main(String[] args) {
SpringApplication.run(SampleReactiveConsumer.class,
"--spring.cloud.stream.function.definition=consume");
}
@Bean
public Consumer<Flux<String>> consume(){
return payloads -> payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}
}
Вы также можете удалить реактивный модуль из пути к классам, так как мы также рассматриваем возможность отказа от всего этого вместе
На данный момент функциональная модель поддерживает только стандартные Source, Processor и Sink, и для этого есть причина - в первую очередь, чтобы продвигать архитектуру в стиле микросервисов, что не то же самое, что создание приложений обмена сообщениями общего назначения с помощью подшивок. Другими словами, отдельный прослушиватель сообщений на микросервис, который адекватно покрывается тремя упомянутыми выше интерфейсами.
Если вы действительно хотите избежать SCF, упомянутого в ответе Олега, вы можете попробовать ниже, хакерский подход.
const val IN = "input"
const val OUT = "dummy-output"
interface Channels {
@Input(IN)
fun input(): MessageChannel
@Output(OUT)
fun output(): MessageChannel
}
@EnableBinding(Channels::class)
class MsgList {
@StreamListener
@Output(OUT)
fun receive(@Input(IN) messages: Flux<String>): Flux<Void> {
return messages
.doOnNext { if (it == "err") throw IllegalStateException("err") }
.doOnNext { println(it) }
.flatMap { Mono.empty<Void>() }
}
}
Привязка вывода будет создана, но сообщения не пройдут. В случае RabbitMQ это означает - фиктивный обмен появится, но очередь не будет создана.
Также ошибки будут обработаны так, как вы ожидали. В приведенном выше примере вы можете отправить 3 сообщения: «ok», «err», «ok2», и вы увидите на экране «ok», затем exception, затем «ok2». Сообщение «ok2» и любое последующее действительное сообщение будут обработаны должным образом.
Вы правы, мне это кажется хакерским. В итоге я использовал нереактивный StreamListener. К сожалению, Spring Could Stream Reactive устарел, поскольку многие «микросервисы» не имеют одного ввода / вывода, который предполагается с помощью функции Spring Cloud.
Что, если я хочу использовать свой собственный интерфейс для объявления каналов ввода / вывода вместо использования предоставленных интерфейсов Source / Sink / Processor? Как я могу сопоставить входы / выходы в моем пользовательском интерфейсе с функцией? Есть ли альтернативный синтаксис для "spring.cloud.stream.function.definition", где я могу указать канал?