Как обрабатывать dispose в RxJava без InterruptedException

В приведенном ниже коде, когда вызывается dispose(), поток эмиттера прерывается (метод InterruptedException выбрасывается из спящего режима).

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();

Из сеанса отладки я вижу, что прерывание происходит от FutureTask, которое отменяется во время удаления. Там поток, который вызывает dispose(), сверяется с потоком исполнителя, и если он не совпадает, эмиттер прерывается. Тема отличается, так как я использовал вычисление Scheduler.

Есть ли способ заставить dispose не прерывать такой эмиттер или это всегда должно обрабатываться? Проблема, которую я вижу с этим подходом, заключается в том, что у меня будет прерываемая операция (здесь имитируется сон), которую я хотел бы завершить в обычном режиме перед вызовом onComplete().

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
1 833
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

См. Отличия версии 2.0 - Обработка ошибок.

One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.

Таким образом, вы можете либо обернуть все внутри try/catch и правильно выдать ошибку:

Observable<Integer> obs = Observable.create(emitter -> {
   try {
      // ...
   } catch (InterruptedException ex) {
      // check if the interrupt is due to cancellation
      // if so, no need to signal the InterruptedException
      if (!disposable.isDisposed()) {
         observer.onError(ex);
      }
   }
});

или настроить глобальный потребитель ошибок, чтобы игнорировать его:

RxJavaPlugins.setErrorHandler(e -> {
    // ..
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    // ...
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

Другие вопросы по теме