Несколько слушателей на RxJava2 BehaviorSubject с противоречивыми результатами

У меня есть BehaviorSubject, у которого есть три слушателя, на которые подписаны до каких-либо выбросов. Я .onNext() две вещи: за А следует Б.

Два слушателя должным образом получают A, а затем B. Но третий слушатель получает B, A. Чем можно объяснить такое поведение? Это все в той же теме.

Вот пример кода (на Котлине), который воспроизводит результаты. Дайте мне знать, если вам нужна версия Java:

    @Test
    fun `rxjava test`() {
        val eventHistory1 = ArrayList<String>()
        val eventHistory2 = ArrayList<String>()
        val eventHistory3 = ArrayList<String>()

        val behaviorSubject = BehaviorSubject.create<String>()

        behaviorSubject.subscribe {
            eventHistory1.add(it)
        }

        behaviorSubject.subscribe {
            eventHistory2.add(it)
            if (it == "A") behaviorSubject.onNext("B")
        }

        behaviorSubject.subscribe {
            eventHistory3.add(it)
        }

        behaviorSubject.onNext("A")

        println(eventHistory1)
        println(eventHistory2)
        println(eventHistory3)

        assert(eventHistory1 == eventHistory2)
        assert(eventHistory2 == eventHistory3)
    }


А вот и результат теста:

[A, B]
[A, B]
[B, A]

Невозможно сказать, не видя часть вашего кода. Пожалуйста, опубликуйте фрагмент кода, демонстрирующий вашу настройку и то, как вы определяете получение A или B.

akarnokd 18.12.2020 23:25

Сделанный! Я должен был упомянуть ранее, но я также вызываю .onNext() в одном из блоков подписки слушателя. Что-то мне подсказывает, что это основная причина проблемы, но я не понимаю почему. Технически все три слушателя подписаны до этого .onNext(), но один из слушателей получает элементы в обратном порядке — и так получилось, что это слушатель, подписавшийся последним.

Grant Park 19.12.2020 03:49

Еще одна интересная вещь: если я задержу .onNext() в вышеупомянутом блоке подписки, я могу получить согласованные результаты для всех трех слушателей.

Grant Park 19.12.2020 04:07
Почему в Python есть оператор &quot;pass&quot;?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Коллекции в Laravel более простым способом
Коллекции в Laravel более простым способом
Привет, читатели, сегодня мы узнаем о коллекциях. В Laravel коллекции - это способ манипулировать массивами и играть с массивами данных. Благодаря...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
Массив зависимостей в React
Массив зависимостей в React
Все о массиве Dependency и его связи с useEffect.
0
3
111
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Субъекты не являются повторно входящими, поэтому вызов onNext для того же субъекта, который в настоящее время обслуживает onNexts, является неопределенным поведением. javadoc предупреждает об этом случае:

Вызов onNext(Object), onError(Throwable) и onComplete() должен быть сериализован (вызывается из одного потока или вызывается неперекрывающимся образом из разных потоков через внешние средства сериализации). Метод Subject.toSerialized(), доступный для всех субъектов, обеспечивает такую ​​сериализацию, а также защищает от повторного входа (т. е. когда нижестоящий наблюдатель, использующий этот субъект, также хочет рекурсивно вызвать onNext(Object) для этого субъекта).

В вашем конкретном случае сигнализация «B» происходит сначала для третьего наблюдателя, когда он собирался подать ему сигнал «A», следовательно, поменявшийся порядок.

Используйте toSerialized на тему, чтобы этого не произошло.

val behaviorSubject = BehaviorSubject.create<String>().toSerialized()

Другие вопросы по теме