RxJava1 concatMap вызывает исключение MissingBackpressureException

Я пытаюсь преобразовать Observable с помощью concatMap, так как порядок важен для моего случая.

@Test
fun load_data() {
    val sub = TestSubscriber<Long>()

    var s = BehaviorSubject.create<Long>()

    s.concatMap {
        Observable.timer(it, TimeUnit.MILLISECONDS)
    }
    .take(4)
    .subscribe(sub)

    s.onNext(5)
    s.onNext(6)
    s.onNext(7)
    s.onNext(8) //rx.exceptions.MissingBackpressureException

    sub.awaitTerminalEvent(500, TimeUnit.MILLISECONDS)
    sub.assertNoErrors()
}

Я изменил загрузку реальных данных на Observable.timer(), чтобы упростить пример и облегчить воспроизведение.

Я использую в приложении BehaviorSubject для связывания действий пользовательского интерфейса с rx

Из документация, особенно из мраморной диаграммы, я ожидаю, что он будет хранить элементы в очереди и преобразовывать их один за другим.

Однако похоже, что concatMap имеет очередь с размером только 2 элемента. Добавление дополнительных элементов вызывает MissingBackpressureException

Итак, у меня есть следующие вопросы:

  1. Почему concatMap имеет размер очереди 2 вместо RxRingBuffer.SIZE, как другие операторы есть?
  2. Стоит ли мне, как правило, добавлять какие-либо операторы onBackpressure* перед вызов concatMap для предотвращения MissingBackpressureException исключение?
0
0
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Прежде чем я отвечу на вопросы, подумайте о переходе на RxJava 2, где это не проблема с Observable.

Why concatMap has queue size 2 instead of RxRingBuffer.SIZE as other operators has?

Оператор запускает по одному Observable за раз, и не было причин для предварительной загрузки более чем 1 заранее.

Should I as a rule add any of onBackpressure* operators before calling concatMap to prevent from MissingBackpressureException exception?

Да.

Да, с rxJava2 не должно быть MissingBackpressureException, однако миграция займет много времени из-за большой базы кода.

Rostyslav Roshak 11.05.2018 13:20

Другие вопросы по теме