По сути, я делаю процессор очереди в Spring Boot и хочу использовать Reactor для async. Я сделал функцию, которая должна повторяться вечно, поскольку она извлекает из очереди, а затем отмечает элемент как обработанный.
вот версия блокировки, которая работает. Подписка возвращает Mono
while(true) {
manager.Subscribe().block()
}
Я не уверен, как превратить это в поток. Я просмотрел интервал, сгенерировал, создал и т.д., и я не могу заставить что-либо работать без вызова block ()
Вот пример того, что я пробовал
Flux.generate(() -> manager,
(state, sink) -> {
state.Subscribe().block();
sink.next("done");
return state;
}));
Поскольку я новичок в Reactor, я не смог найти ничего о простом цикле и синхронной обработке Monos без блокировки.
Вот что делает метод Subscribe с помощью AWS Java SDK v2:
public Mono Subscribe() {
return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
.waitTimeSeconds(10)
.queueUrl(_queueUrl)
.build()))
.filter(x -> x.messages() != null)
.flatMap(x -> Mono.when(x.messages()
.stream()
.map(y -> {
_log.warn(y.body());
return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(_queueUrl)
.receiptHandle(y.receiptHandle())
.build()));
}).collect(Collectors.toList())));
}
По сути, я просто опрашиваю очередь SQS, удаляю сообщения и хочу сделать это снова. Для меня это всего лишь проба.
Спасибо!
Спасибо за ответ. Он управляется Spring Boot DI. Я не уверен, что это имеет значение, кроме того факта, что он возвращает Mono, я хочу убедиться, что он завершен, прежде чем я запустил его снова. Я добавил некоторые детали подписки




Вам нужны две вещи: способ подписки в цикле и способ гарантировать, что метод Subscribe() эффективно вызывается на каждой итерации (поскольку Future необходимо воссоздавать).
repeat() - это встроенный оператор, который повторно подпишется на свой источник после его завершения. Если источник ошибается, цикл повтора останавливается. Самый простой вариант продолжает делать это Long.MAX_VALUE раза.
Единственная проблема в том, что в вашем случае Mono из Subscribe()должен воссоздается на каждой итерации.
Для этого вы можете обернуть вызов Subscribe() в defer: он будет повторно вызывать метод каждый раз, когда происходит новая подписка, включая каждую повторную попытку:
Flux<Stuff> repeated = Mono
.defer(manager::Subscribe)
.repeat();
Отлично работает, спасибо! Не знаю, почему я не заметил повтора в документации по API.
что это за объект-менеджер? Откуда это взялось? Не могли бы вы показать весь класс?