Как реализовать Observable.concatEagerDelayError или эквивалент в RxJava2/RxKotlin2?
Есть :
Но нет :
Что я имею :
fun getAll(): Observable<List<User>> = Observable.concatArrayDelayError(
// from db
userDAO
.selectAll()
.subscribeOn(ioScheduler),
// from api
userAPI
.getAll()
.doOnNext { lstUser -> Completable.concatArray(
userDAO.deleteAll().subscribeOn(ioScheduler),
userDAO.save(lstUser).subscribeOn(ioScheduler)
) }
.subscribeOn(ioScheduler)
)
Я хочу такого же поведения, но с нетерпением жду selectAll() и getAll(), потому что нет причин ждать от db запуска сетевого вызова.





Используйте concatMapEagerDelayError:
Observable.fromIterable(sources)
.concatMapEagerDelayError(v -> v, true);
Observable.fromArray(source1, source2, source3)
.concatMapEagerDelayError(v -> v, true);
Редактировать:
fun getAll(): Observable<List<User>> = Observable.fromArray(
// from db
userDAO
.selectAll()
.subscribeOn(ioScheduler),
// from api
userAPI
.getAll()
// --- this makes no sense by the way -------------------
.doOnNext { lstUser -> Completable.concatArray(
userDAO.deleteAll().subscribeOn(ioScheduler),
userDAO.save(lstUser).subscribeOn(ioScheduler)
)}
// ------------------------------------------------------
.subscribeOn(ioScheduler)
)
.concatMapEagerDelayError({ v -> v }, true)
Я обновил свой ответ. Пожалуйста, в следующий раз предоставьте всю необходимую информацию заранее.
Спасибо и извините. Можете ли вы объяснить мне, почему обновление базы данных не имеет смысла?
Вы создаете и выбрасываете Completable в doOnNext. Сомневаюсь, что это работает, а если и работает, то вы делаете что-то гораздо хуже в userDAO.
Вызов для удаления и сохранения протестирован. Но, может быть, вы предложите другой способ сделать то же самое?
Я бы предложил flatMapCompletable вместо doOnNext.
Спасибо. Может лучше оставить doOnNext и не возвращать Completable за deleteAll и save ?
Я отредактировал исходный вопрос с большей точностью. Я не думаю, что это ответ. Или вы можете привести точный пример для моего случая?