Вот код, который у меня есть для буферизации и преобразования входящих событий:
public Publisher<Collection<EventTO>> logs(String eventId) {
ConnectableObservable<Event> connectableObservable = eventsObservable
.share().publish();
connectableObservable.connect();
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.map(eventsMapper::mapCollection);
}
Проблема в том, что Flowable каждую секунду возвращает пустой список, хотя на eventsObservable не публикуется никаких событий.
Есть ли способ удерживать .buffer, пока не будет хотя бы один объект?
Примечание: Похоже, есть способ сделать это на C# (описан здесь: https://stackoverflow.com/a/30090185/668148). Но как это сделать на Java?




Как предложил Марк Кин, .distinctUntilChanged делает свое дело.
Таким образом, следующий код вытолкнет список событий, если после буферизации осталось 1+ элементов:
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.distinctUntilChanged() // <<<======
.map(eventsMapper::mapCollection);
вы можете использовать
.distinctUntilChangedпослеbuffer( ... )или.filter(collection -> !collection.isEmpty)?