Я новичок в использовании java RX, и у меня возникла проблема, надеюсь, кто-нибудь может дать мне понять, что я делаю не так.
Проблема: Я отслеживаю много событий, они запускаются следующим образом:
Observable<Long> otherObservable = Observable.empty();
public void myMethod(){
Observable<Long> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS);
final Subscriber<Long> timeSubscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {
// nothing really
}
@Override
public void onError(final Throwable throwable) {
// nothing really
}
@Override
public void onNext(final Long number) {
// Here i do something
}
};
return Observable.merge(timerObservable, otherObservable)
.first()
.subscribe(timeSubscriber);
}
Таким образом, в основном он запускает событие после VARIABLE_TIME.
Это прекрасно работает, но теперь я столкнулся с тем, что у меня слишком много мероприятий.
Итак, я подумал об использовании debounce и buffer.
Я пытаюсь сделать вот что:
Тем не менее создайте множество наблюдаемых, которые генерируют событие через N секунд.
Соберите информацию от каждого из них (длинную или, может быть, строку)
По истечении времени задержки (времени буферизации) отправить подписчику список со всей собранной информацией.
Пока что я сделал это:
Observable<List<Long>> otherObservable = Observable.empty();
otherObservable.debounce(10L, SECONDS).buffer(20L, SECONDS);
Observable<List<Long>> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS).buffer(1);
Subscriber<List<Long> > observerSuscriber = new Subscriber<List<Long>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onNext(final List<Long> ids ) {
// do something here
}
};
Observable.merge(otherObservable, observable1)
.first()
.subscribe(observerSuscriber);
Но так я все равно получаю сообщение сразу после отправки.
Мне интересно, есть ли способ сделать это? Любые идеи? Я использую java RX 1.2




After a delay time (buffer time) Send a list with all the collected info to the subscriber.
Вы сами ответили на свой вопрос! Используйте оператор buffer:
Flowable<String> stream = ...
Flowable<List<String>> lists = stream.buffer(5, TimeUnit.SECONDS);