Создать Flowable из цикла while

Я новичок в RxJava, и мне нужно создать репозиторий с несколькими источниками данных. Для меня это сложно, потому что есть несколько более мелких подзадач, которые я не знаю, как реализовать с помощью RxJava.

Сначала у меня есть самостоятельный дао, который обрабатывает InputStream и предоставляет элементы в указанном диапазоне. В настоящее время он просто собирает данные в список, но я хочу предоставлять элементы один за другим, используя поток; В настоящее время он предоставляет Maybe<List<Item>>. Также есть несколько ошибок, которые необходимо передать на более высокий уровень (источник данных). Например, EndOfFile, чтобы уведомить DataSource о том, что данные полностью кэшированы;

Dao.class:

List<Item> loadRange(int start, int number) throws ... {
    ...
    while(...) {
        ...
        //TODO contribute item to flowable
        resultList.add(new Item(...)) 

    }
    return resultList;
}

Maybe<List<Item>> только что созданный метод Maybe.fromCallable();

Пожалуйста помогите!

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
451
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Что-то вроде этого должно работать для этого:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(emitter -> {
            try {
                while (...){
                    emitter.onNext(new Item());
                }
                emitter.onComplete();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, BackpressureStrategy.BUFFER);
    }

Я предполагаю, что после завершения цикла вы хотите завершить его, а также отправлять ошибки вниз по течению, а не обрабатывать сигнатуру метода. Также вы можете изменить BackPressureStrategy в соответствии с вашими вариантами использования, например, DROP, LATEST и т. д..

Поскольку вы новичок в RxJava, анонимный класс будет:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(new FlowableOnSubscribe<Item>() {
            @Override public void subscribe(FlowableEmitter<Item> emitter) {
                try {
                    while (...){
                        emitter.onNext(new Item());
                    }
                    emitter.onComplete();
                } catch (IOException e) {
                    emitter.onError(e);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }

Другие вопросы по теме