Я хочу поделиться (т. е. разделить) своим потоком, но share(), похоже, не приводит к совместному использованию моей подписки. Почему?
У меня есть Flux, выданный дорогостоящим вызовом базы данных. Я хочу разделить этот поток и обработать значения, которые он производит, разными способами (но не использовать оператор groupBy()). Позже я хочу снова объединить разные пути, чтобы мне нужно было подписаться только один раз (например, через контроллер REST):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return
Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
.share() // share the flux so the DB is only queried once
;
}
static Flux<String> pathA() {
// complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB() {
// different, equally complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
return Flux.merge(pathA(), pathB());
}
@Test
void test() {
StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
}
Поскольку я использую оператор share(), я ожидал увидеть только одну подписку, но на самом деле я вижу две "subscribed". Почему?
Разве оператор share() не должен подписываться на восходящий поток и сам обрабатывать все нисходящие подписки, вместо того, чтобы передавать их обратно восходящему источнику?
По крайней мере, я так понял документы; вот что они говорят:
Возвращает новый Flux, который осуществляет многоадресную рассылку (разделяет) исходный Flux. [...]


Объяснение очень простое: любое поведение кэширования/совместного использования привязано к экземпляру потока. Это означает, что когда вы это делаете:
public Flux<Integer> sharedCountdown() {
return Flux.just(3, 2, 1, 0).share();
}
var instanceA = sharedCountdown();
var instanceB = sharedCountdown();
Вы создаете два отдельных потока, каждый из которых будет иметь свою собственную подписку и кеш. Это точно так же, как и любой другой простой объект Java.
Если вы хотите совместного использования, вам придется использовать один и тот же экземпляр в последующей обработке.
В вашем случае вам придется изменить логику. Ваши постпроцессоры не должны сами вызывать/создавать дорогостоящий флюс, они должны получать на вход собранный экземпляр. Кроме того, если вы хотите, чтобы все ваши нижестоящие процессоры получали все восходящие сигналы, вам следует избегать share() и вместо этого использовать Publish() .autoconnect(numberOfPostProcessors):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return
Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
;
}
static Flux<String> pathA(Flux<String> upstreamFlux) {
// complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB(Flux<String> upstreamFlux) {
// different, equally complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
}