Я пытаюсь преобразовать Observable с помощью concatMap, так как порядок важен для моего случая.
@Test
fun load_data() {
val sub = TestSubscriber<Long>()
var s = BehaviorSubject.create<Long>()
s.concatMap {
Observable.timer(it, TimeUnit.MILLISECONDS)
}
.take(4)
.subscribe(sub)
s.onNext(5)
s.onNext(6)
s.onNext(7)
s.onNext(8) //rx.exceptions.MissingBackpressureException
sub.awaitTerminalEvent(500, TimeUnit.MILLISECONDS)
sub.assertNoErrors()
}
Я изменил загрузку реальных данных на Observable.timer(), чтобы упростить пример и облегчить воспроизведение.
Я использую в приложении BehaviorSubject для связывания действий пользовательского интерфейса с rx
Из документация, особенно из мраморной диаграммы, я ожидаю, что он будет хранить элементы в очереди и преобразовывать их один за другим.
Однако похоже, что concatMap имеет очередь с размером только 2 элемента. Добавление дополнительных элементов вызывает MissingBackpressureException
Итак, у меня есть следующие вопросы:
concatMap имеет размер очереди 2 вместо RxRingBuffer.SIZE, как
другие операторы есть?onBackpressure* перед
вызов concatMap для предотвращения MissingBackpressureException
исключение?Прежде чем я отвечу на вопросы, подумайте о переходе на RxJava 2, где это не проблема с Observable.
Why concatMap has queue size 2 instead of RxRingBuffer.SIZE as other operators has?
Оператор запускает по одному Observable за раз, и не было причин для предварительной загрузки более чем 1 заранее.
Should I as a rule add any of onBackpressure* operators before calling concatMap to prevent from MissingBackpressureException exception?
Да.
Да, с rxJava2 не должно быть MissingBackpressureException, однако миграция займет много времени из-за большой базы кода.