Хотя одна из направляющих выдает onError(), isCancelled() в другой направляющей по-прежнему возвращает false, что приводит к UndeliverableException. Как проверить, что нисходящий поток отменяется в параллельных рельсах?
Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
System.out.println("Flowable.create-emitter.isCancelled:" + emitter.isCancelled());
for (int i = 1; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).parallel(6).runOn(Schedulers.io())
.flatMap(new Function<Integer, Publisher<String>>() {
@Override
public Publisher<String> apply(Integer t) throws Exception {
// TODO Auto-generated method stub
return Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
System.out.println("flatMap-before onError-isCancelled:" + emitter.isCancelled());
try {
if (true) { // assume trigger the error
throw new Exception("Test");
}
if (!emitter.isCancelled()) {
emitter.onNext(String.valueOf((t + 1)));
emitter.onComplete();
}
} catch (Exception ex) {
if (!emitter.isCancelled()) {
emitter.onError(ex);
}
}
System.out.println("flatMap-after onError-isCancelled:" + emitter.isCancelled());
}
}, BackpressureStrategy.BUFFER);
}
}).sequential().subscribeOn(scheduler).observeOn(Schedulers.single())
.subscribeWith(new ResourceSubscriber<String>() {
public void onComplete() {
System.out.println("onComplete");
}
public void onError(Throwable arg0) {
System.out.println("onError:" + arg0.toString());
}
public void onNext(String arg0) {
System.out.println("onNext:" + arg0);
}
});




В тот момент, когда вы вызовете onError(), вся цепочка последующих наблюдателей завершится. Вы не можете делать onNext(), onError() или onComplete() после этого, потому что не на что доставлять. Это называется «реактивный контракт».
Я нашел решение. Мне нужно добавить глобального потребителя ошибок, чтобы решить проблему. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
Спасибо за ответ. Я знаю правило: я испускаю
onNext()иonComplete()после того, какonError()предназначен для некоторого тестирования. Я отредактировал код в соответствии с правилом, но проблема все еще существует