Почему Flux.share() не делится своей подпиской?

Я хочу поделиться (т. е. разделить) своим потоком, но share(), похоже, не приводит к совместному использованию моей подписки. Почему?

У меня есть Flux, выданный дорогостоящим вызовом базы данных. Я хочу разделить этот поток и обработать значения, которые он производит, разными способами (но не использовать оператор groupBy()). Позже я хочу снова объединить разные пути, чтобы мне нужно было подписаться только один раз (например, через контроллер REST):

    static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
        return 
            Flux.generate( 
                () -> {
                    System.out.println("subscribed"); // should only happen once
                    return 0;
                },
                (state, sink) -> {
                    sink.next(state);
                    return state + 1;
                }
            )
            .map(String::valueOf)
            .log()
            .delayElements(Duration.ofSeconds(1))
            .take(2)
            .share() // share the flux so the DB is only queried once
        ;
    }

    static Flux<String> pathA() {
        // complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
    }

    static Flux<String> pathB() {
        // different, equally complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
    }

    static Flux<String> controller() { // pretend this happens in a REST controller
        return Flux.merge(pathA(), pathB());
    }

    @Test
    void test() {
        StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
    }

Поскольку я использую оператор share(), я ожидал увидеть только одну подписку, но на самом деле я вижу две "subscribed". Почему?

Разве оператор share() не должен подписываться на восходящий поток и сам обрабатывать все нисходящие подписки, вместо того, чтобы передавать их обратно восходящему источнику?

По крайней мере, я так понял документы; вот что они говорят:

Возвращает новый Flux, который осуществляет многоадресную рассылку (разделяет) исходный Flux. [...]

Управление состоянием компонента контейнера с помощью RxAngular
Управление состоянием компонента контейнера с помощью RxAngular
Чтобы сделать ваш компонент как можно более тонким, я предлагаю абстрагировать состояние компонента в виде фасада . Основная идея здесь заключается в...
В чем разница между Promise и Observable?
В чем разница между Promise и Observable?
Разберитесь в этом вопросе, и вы значительно повысите уровень своей компетенции.
1
0
75
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Объяснение очень простое: любое поведение кэширования/совместного использования привязано к экземпляру потока. Это означает, что когда вы это делаете:

public Flux<Integer> sharedCountdown() {
    return Flux.just(3, 2, 1, 0).share();
}

var instanceA = sharedCountdown();
var instanceB = sharedCountdown();

Вы создаете два отдельных потока, каждый из которых будет иметь свою собственную подписку и кеш. Это точно так же, как и любой другой простой объект Java.

Если вы хотите совместного использования, вам придется использовать один и тот же экземпляр в последующей обработке.

В вашем случае вам придется изменить логику. Ваши постпроцессоры не должны сами вызывать/создавать дорогостоящий флюс, они должны получать на вход собранный экземпляр. Кроме того, если вы хотите, чтобы все ваши нижестоящие процессоры получали все восходящие сигналы, вам следует избегать share() и вместо этого использовать Publish() .autoconnect(numberOfPostProcessors):

    static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
        return 
            Flux.generate( 
                () -> {
                   System.out.println("subscribed"); // should only happen once
                   return 0;
                },
                (state, sink) -> {
                   sink.next(state);
                   return state + 1;
                }
            )
            .map(String::valueOf)
            .log()
            .delayElements(Duration.ofSeconds(1))
            .take(2)
        ;
    }

    static Flux<String> pathA(Flux<String> upstreamFlux) {
        // complicated calculations
        return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
    }

    static Flux<String> pathB(Flux<String> upstreamFlux) {
        // different, equally complicated calculations
        return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
    }

    static Flux<String> controller() { // pretend this happens in a REST controller
        var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
        return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
    }

Другие вопросы по теме