Буферизация RxJava - игнорирование нулевых элементов

Вот код, который у меня есть для буферизации и преобразования входящих событий:

public Publisher<Collection<EventTO>> logs(String eventId) {
    ConnectableObservable<Event> connectableObservable = eventsObservable
        .share().publish();
    connectableObservable.connect();

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .map(eventsMapper::mapCollection);
}

Проблема в том, что Flowable каждую секунду возвращает пустой список, хотя на eventsObservable не публикуется никаких событий.

Есть ли способ удерживать .buffer, пока не будет хотя бы один объект?

Примечание: Похоже, есть способ сделать это на C# (описан здесь: https://stackoverflow.com/a/30090185/668148). Но как это сделать на Java?

вы можете использовать .distinctUntilChanged после buffer( ... ) или .filter(collection -> !collection.isEmpty)?

Mark 20.01.2019 21:09
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
1
121
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Как предложил Марк Кин, .distinctUntilChanged делает свое дело.

Таким образом, следующий код вытолкнет список событий, если после буферизации осталось 1+ элементов:

connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
    .filter(event -> event.getId().equals(eventId))
    .buffer(1, TimeUnit.SECONDS, 50)
    .distinctUntilChanged()             // <<<======  
    .map(eventsMapper::mapCollection);

Другие вопросы по теме