RxJava2 - Emit onError на параллельном рельсе получит исключение UndeliverableException

Хотя одна из направляющих выдает onError(), isCancelled() в другой направляющей по-прежнему возвращает false, что приводит к UndeliverableException. Как проверить, что нисходящий поток отменяется в параллельных рельсах?

Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        System.out.println("Flowable.create-emitter.isCancelled:" + emitter.isCancelled());
        for (int i = 1; i < 10; i++) {
            emitter.onNext(i);
        }
        emitter.onComplete();
    }

}, BackpressureStrategy.BUFFER).parallel(6).runOn(Schedulers.io())
        .flatMap(new Function<Integer, Publisher<String>>() {
            @Override
            public Publisher<String> apply(Integer t) throws Exception {
                // TODO Auto-generated method stub
                return Flowable.create(new FlowableOnSubscribe<String>() {

                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                            System.out.println("flatMap-before onError-isCancelled:" + emitter.isCancelled());
                            try {
                                if (true) { // assume trigger the error
                                    throw new Exception("Test");
                                }
                                if (!emitter.isCancelled()) {
                                    emitter.onNext(String.valueOf((t + 1)));
                                    emitter.onComplete();
                                }
                            } catch (Exception ex) {
                                if (!emitter.isCancelled()) {
                                    emitter.onError(ex);
                                }
                            }
                            System.out.println("flatMap-after onError-isCancelled:" + emitter.isCancelled());
                    }
                }, BackpressureStrategy.BUFFER);
            }

        }).sequential().subscribeOn(scheduler).observeOn(Schedulers.single())
        .subscribeWith(new ResourceSubscriber<String>() {

            public void onComplete() {
                System.out.println("onComplete");

            }

            public void onError(Throwable arg0) {
                System.out.println("onError:" + arg0.toString());

            }

            public void onNext(String arg0) {
                System.out.println("onNext:" + arg0);
            }

        });
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
488
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

В тот момент, когда вы вызовете onError(), вся цепочка последующих наблюдателей завершится. Вы не можете делать onNext(), onError() или onComplete() после этого, потому что не на что доставлять. Это называется «реактивный контракт».

Спасибо за ответ. Я знаю правило: я испускаю onNext() и onComplete() после того, как onError() предназначен для некоторого тестирования. Я отредактировал код в соответствии с правилом, но проблема все еще существует

KpSt 25.04.2018 09:10
Ответ принят как подходящий

Я нашел решение. Мне нужно добавить глобального потребителя ошибок, чтобы решить проблему. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

Другие вопросы по теме