У меня есть существующая цепочка интерфейсов, которую я хочу запустить как реактор вместо управления своими потоками и очередями.
public interface UserLookupService {
public User lookup(String id);
}
public interface UsersHandler {
public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...
// Works well to lookup users in parallel.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(str -> {
userSvc.lookup(str);
});
Как я могу связать этот результат, чтобы он вызывал UsersHandler с пакетами User ?




Подписка на что-то запускает цепочку, так что вы, как правило, не можете «цеплять» подписчиков, они являются последними в цепочке.
Подумайте, если так, вы настраиваете свой реактивный конвейер, и когда вы subscribe, вы запускаете конвейер, и цепочка выдает результат.
На веб-сервере subscriber обычно является вызывающим клиентом, и когда клиент subscribes, он запускает цепочку событий на сервере, которая будет публиковать данные.
Flux похож на список от 1 до n Monos. Каждый объект в Mono/Flux имеет, так сказать, несколько «состояний». Это Success, Error, Cancel, Next, Completed и другие.
Когда Mono/Flux внутренне переходит в состояние Success, он выдает в нем значение. Mono обычно идет Success, когда что-то разрешилось в моно.
когда вы объявляете Flux.just("userA", "userB", "userC"), вы в основном просите поток разрешить ввод, который вы ему вводите. Размещение строки — это то, что будет разрешено мгновенно, поэтому поток перейдет в состояние Success и начнет испускать строки, как только что-то Subscribes. Так что все, что вам нужно сделать, это объявить цепочку, которую вы хотите, чтобы произошло после кого-то Subscribes.
Это можно сделать несколькими способами, когда вы хотите что-то сделать и изменить значение, например, вы хотите с string на user мы обычно используем map.
Если мы просто хотим что-то сделать с каждым объектом и ничего не возвращать, мы можем использовать doOnNext.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.map(userString -> {
return lookupService.lookup(userString);
})
.doOnNext(user -> {
// if you want to do something on each user
// will return void so if you want to log something
// or handle each user
}).subscribe();
Подписка должна быть последней в цепочке.