В приведенном ниже коде, когда вызывается dispose()
, поток эмиттера прерывается (метод InterruptedException
выбрасывается из спящего режима).
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
Из сеанса отладки я вижу, что прерывание происходит от FutureTask
, которое отменяется во время удаления. Там поток, который вызывает dispose()
, сверяется с потоком исполнителя, и если он не совпадает, эмиттер прерывается. Тема отличается, так как я использовал вычисление Scheduler
.
Есть ли способ заставить dispose не прерывать такой эмиттер или это всегда должно обрабатываться? Проблема, которую я вижу с этим подходом, заключается в том, что у меня будет прерываемая операция (здесь имитируется сон), которую я хотел бы завершить в обычном режиме перед вызовом onComplete()
.
См. Отличия версии 2.0 - Обработка ошибок.
One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
Таким образом, вы можете либо обернуть все внутри try/catch и правильно выдать ошибку:
Observable<Integer> obs = Observable.create(emitter -> {
try {
// ...
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
});
или настроить глобальный потребитель ошибок, чтобы игнорировать его:
RxJavaPlugins.setErrorHandler(e -> {
// ..
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
// ...
Log.warning("Undeliverable exception received, not sure what to do", e);
});