Reactor: функция создания Monos to Flux

По сути, я делаю процессор очереди в Spring Boot и хочу использовать Reactor для async. Я сделал функцию, которая должна повторяться вечно, поскольку она извлекает из очереди, а затем отмечает элемент как обработанный.

вот версия блокировки, которая работает. Подписка возвращает Mono

while(true) {
    manager.Subscribe().block()
}

Я не уверен, как превратить это в поток. Я просмотрел интервал, сгенерировал, создал и т.д., и я не могу заставить что-либо работать без вызова block ()

Вот пример того, что я пробовал

Flux.generate(() -> manager,
    (state, sink) -> {
        state.Subscribe().block();
        sink.next("done");
        return state;
    }));

Поскольку я новичок в Reactor, я не смог найти ничего о простом цикле и синхронной обработке Monos без блокировки.

Вот что делает метод Subscribe с помощью AWS Java SDK v2:

public Mono Subscribe() {
    return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
            .waitTimeSeconds(10)
            .queueUrl(_queueUrl)
            .build()))
            .filter(x -> x.messages() != null)
            .flatMap(x -> Mono.when(x.messages()
                    .stream()
                    .map(y -> {
                        _log.warn(y.body());
                        return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
                                .queueUrl(_queueUrl)
                                .receiptHandle(y.receiptHandle())
                                .build()));
                    }).collect(Collectors.toList())));
}

По сути, я просто опрашиваю очередь SQS, удаляю сообщения и хочу сделать это снова. Для меня это всего лишь проба.

Спасибо!

что это за объект-менеджер? Откуда это взялось? Не могли бы вы показать весь класс?

Brian Clozel 21.04.2018 19:08

Спасибо за ответ. Он управляется Spring Boot DI. Я не уверен, что это имеет значение, кроме того факта, что он возвращает Mono, я хочу убедиться, что он завершен, прежде чем я запустил его снова. Я добавил некоторые детали подписки

adamhathcock 23.04.2018 09:50
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
2
520
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам нужны две вещи: способ подписки в цикле и способ гарантировать, что метод Subscribe() эффективно вызывается на каждой итерации (поскольку Future необходимо воссоздавать).

repeat() - это встроенный оператор, который повторно подпишется на свой источник после его завершения. Если источник ошибается, цикл повтора останавливается. Самый простой вариант продолжает делать это Long.MAX_VALUE раза.

Единственная проблема в том, что в вашем случае Mono из Subscribe()должен воссоздается на каждой итерации.

Для этого вы можете обернуть вызов Subscribe() в defer: он будет повторно вызывать метод каждый раз, когда происходит новая подписка, включая каждую повторную попытку:

Flux<Stuff> repeated = Mono
    .defer(manager::Subscribe)
    .repeat();

Отлично работает, спасибо! Не знаю, почему я не заметил повтора в документации по API.

adamhathcock 24.04.2018 11:52

Другие вопросы по теме