PublishSubject в RxJava / RxAndroid вызывает сбой при ошибке и пропускает следующий

У меня такой код:

Процесс X:

 getLocationObservable() // ---> async operation that fetches the location. 
    //  Once location is found(or failed to find) it sends it to this filter :
            .filter(location -> {
                --- Operation A ---
                after finishing the operation A, I either return 'true' and continue 
                 to the next observable which is a Retrofit server call, or simply 
                  return 'false' and quit.
            })
            .flatMap(location -> getRetrofitServerCallObservable( location )
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    new Observer<MyCustomResponse>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            _disposable = d;
                        }
                        @Override
                        public void onNext(MyCustomResponse response) {

                        }
                        @Override
                        public void onError(Throwable e) {

                        }
                        @Override
                        public void onComplete() {

                        }
                    });

В классе Location это так:

 private PublishSubject<Location> locationPublishSubject = PublishSubject.create();

    public Observable<Location> getLocationObservable() {
        return locationPublishSubject;
    }

    and then...
    locationPublishSubject.onNext( foundLocation );

Проблемы с PublishSubject:

  1. Если я позвоню locationPublishSubject.onError( new <some mock exception> ) -> это вылетает с io.reactivex.exceptions.UndeliverableException
  2. Если я вызываю locationPublishSubject.onComplete после onNext -> onNext не случаться. Он сразу переходит к onComplete.

1.) Пожалуйста, предоставьте полное исключение. 2.) PublishSubject не сохраняет никаких элементов onNext. Если у вас нет наблюдателя в тот момент, когда вы звоните onNext, этот элемент теряется. Перед вызовом PublishSubject.hasObservers проверьте, возвращает ли onNext значение false.

akarnokd 24.06.2018 10:27

1) У меня нет особых исключений .. в некоторых случаях это обычный объект Exception. Я просто хочу иметь возможность генерировать даже пустое исключение, а не передавать через onNextnull, завернутый в какой-то настраиваемый класс. 2) hasObservers возвращает true до того, как я вызываю onNext. И все же, даже если я вызываю onComplete сразу после onNext, обратный вызов onNext никогда не вызывается. Он сразу переходит к onComplete. По сути, это означает, что я должен вызвать только onNext, а затем вручную вызвать dispose() на одноразовом. Пожалуйста, поправьте меня, если я ошибаюсь ..

BVtp 24.06.2018 10:39

1) вы хотите продолжить после ошибки? Рассмотрите вариант упаковки. 2) У вас есть фильтр в вашем потоке, который может отфильтровать ваш элемент.

akarnokd 24.06.2018 11:39

1) В идеале я бы предпочел отловить ошибку в обратном вызове onError, а затем выполнить некоторый код, который я создал для этого случая. Вместо того, чтобы анализировать, было ли это успешным во время filter. Причина с onError мне не нужно вручную вызывать dispose(). 2) из-за фильтра не могу использовать onComplete? вот почему он пропускает обратный вызов onNext, если я вызываю onComplete сразу после onNext? так после каждого вызова одноразового я должен звонить dispose() вручную?

BVtp 24.06.2018 11:45

Предоставленный вами код и описываемое поведение не совпадают. Приведите отдельный пример, воспроизводящий вашу проблему.

akarnokd 24.06.2018 12:13

Извините, но это именно тот код, который я использую (я удалил только код внутри "filter" и другие обратные вызовы, потому что они не имеют отношения к делу, проблемы возникают раньше них)

BVtp 24.06.2018 12:25

Разместите doOnNext в разных местах, чтобы увидеть, куда пропадают onNext. Код запускается несколько раз? Если вы onComplete предмет, остальной код не будет выполняться в следующий раз, и вы получите onComplete через цепочку.

akarnokd 24.06.2018 13:58
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
7
581
0

Другие вопросы по теме