Как связать подписчиков реактора

У меня есть существующая цепочка интерфейсов, которую я хочу запустить как реактор вместо управления своими потоками и очередями.

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

Как я могу связать этот результат, чтобы он вызывал UsersHandler с пакетами User ?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
599
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Подписка на что-то запускает цепочку, так что вы, как правило, не можете «цеплять» подписчиков, они являются последними в цепочке.

Подумайте, если так, вы настраиваете свой реактивный конвейер, и когда вы subscribe, вы запускаете конвейер, и цепочка выдает результат.

На веб-сервере subscriber обычно является вызывающим клиентом, и когда клиент subscribes, он запускает цепочку событий на сервере, которая будет публиковать данные.

Flux похож на список от 1 до n Monos. Каждый объект в Mono/Flux имеет, так сказать, несколько «состояний». Это Success, Error, Cancel, Next, Completed и другие.

Когда Mono/Flux внутренне переходит в состояние Success, он выдает в нем значение. Mono обычно идет Success, когда что-то разрешилось в моно.

когда вы объявляете Flux.just("userA", "userB", "userC"), вы в основном просите поток разрешить ввод, который вы ему вводите. Размещение строки — это то, что будет разрешено мгновенно, поэтому поток перейдет в состояние Success и начнет испускать строки, как только что-то Subscribes. Так что все, что вам нужно сделать, это объявить цепочку, которую вы хотите, чтобы произошло после кого-то Subscribes.

Это можно сделать несколькими способами, когда вы хотите что-то сделать и изменить значение, например, вы хотите с string на user мы обычно используем map.

Если мы просто хотим что-то сделать с каждым объектом и ничего не возвращать, мы можем использовать doOnNext.

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

Подписка должна быть последней в цепочке.

Другие вопросы по теме