У меня такой код:
Процесс X:
getLocationObservable() // ---> async operation that fetches the location.
// Once location is found(or failed to find) it sends it to this filter :
.filter(location -> {
--- Operation A ---
after finishing the operation A, I either return 'true' and continue
to the next observable which is a Retrofit server call, or simply
return 'false' and quit.
})
.flatMap(location -> getRetrofitServerCallObservable( location )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<MyCustomResponse>() {
@Override
public void onSubscribe(Disposable d) {
_disposable = d;
}
@Override
public void onNext(MyCustomResponse response) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
В классе Location это так:
private PublishSubject<Location> locationPublishSubject = PublishSubject.create();
public Observable<Location> getLocationObservable() {
return locationPublishSubject;
}
and then...
locationPublishSubject.onNext( foundLocation );
Проблемы с PublishSubject:
locationPublishSubject.onError( new <some mock exception> ) -> это
вылетает с io.reactivex.exceptions.UndeliverableExceptionlocationPublishSubject.onComplete после onNext -> onNext не
случаться. Он сразу переходит к onComplete.1) У меня нет особых исключений .. в некоторых случаях это обычный объект Exception. Я просто хочу иметь возможность генерировать даже пустое исключение, а не передавать через onNextnull, завернутый в какой-то настраиваемый класс. 2) hasObservers возвращает true до того, как я вызываю onNext. И все же, даже если я вызываю onComplete сразу после onNext, обратный вызов onNext никогда не вызывается. Он сразу переходит к onComplete. По сути, это означает, что я должен вызвать только onNext, а затем вручную вызвать dispose() на одноразовом. Пожалуйста, поправьте меня, если я ошибаюсь ..
1) вы хотите продолжить после ошибки? Рассмотрите вариант упаковки. 2) У вас есть фильтр в вашем потоке, который может отфильтровать ваш элемент.
1) В идеале я бы предпочел отловить ошибку в обратном вызове onError, а затем выполнить некоторый код, который я создал для этого случая. Вместо того, чтобы анализировать, было ли это успешным во время filter. Причина с onError мне не нужно вручную вызывать dispose(). 2) из-за фильтра не могу использовать onComplete? вот почему он пропускает обратный вызов onNext, если я вызываю onComplete сразу после onNext? так после каждого вызова одноразового я должен звонить dispose() вручную?
Предоставленный вами код и описываемое поведение не совпадают. Приведите отдельный пример, воспроизводящий вашу проблему.
Извините, но это именно тот код, который я использую (я удалил только код внутри "filter" и другие обратные вызовы, потому что они не имеют отношения к делу, проблемы возникают раньше них)
Разместите doOnNext в разных местах, чтобы увидеть, куда пропадают onNext. Код запускается несколько раз? Если вы onComplete предмет, остальной код не будет выполняться в следующий раз, и вы получите onComplete через цепочку.




1.) Пожалуйста, предоставьте полное исключение. 2.)
PublishSubjectне сохраняет никаких элементовonNext. Если у вас нет наблюдателя в тот момент, когда вы звонитеonNext, этот элемент теряется. Перед вызовомPublishSubject.hasObserversпроверьте, возвращает лиonNextзначение false.