Я новичок в RxJava, и мне нужно создать репозиторий с несколькими источниками данных. Для меня это сложно, потому что есть несколько более мелких подзадач, которые я не знаю, как реализовать с помощью RxJava.
Сначала у меня есть самостоятельный дао, который обрабатывает InputStream и предоставляет элементы в указанном диапазоне. В настоящее время он просто собирает данные в список, но я хочу предоставлять элементы один за другим, используя поток; В настоящее время он предоставляет Maybe<List<Item>>
. Также есть несколько ошибок, которые необходимо передать на более высокий уровень (источник данных). Например, EndOfFile, чтобы уведомить DataSource о том, что данные полностью кэшированы;
Dao.class
:
List<Item> loadRange(int start, int number) throws ... {
...
while(...) {
...
//TODO contribute item to flowable
resultList.add(new Item(...))
}
return resultList;
}
Maybe<List<Item>>
только что созданный метод Maybe.fromCallable()
;
Пожалуйста помогите!
Что-то вроде этого должно работать для этого:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(emitter -> {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}, BackpressureStrategy.BUFFER);
}
Я предполагаю, что после завершения цикла вы хотите завершить его, а также отправлять ошибки вниз по течению, а не обрабатывать сигнатуру метода. Также вы можете изменить BackPressureStrategy
в соответствии с вашими вариантами использования, например, DROP
, LATEST
и т. д..
Поскольку вы новичок в RxJava, анонимный класс будет:
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(new FlowableOnSubscribe<Item>() {
@Override public void subscribe(FlowableEmitter<Item> emitter) {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}