У меня есть BehaviorSubject, у которого есть три слушателя, на которые подписаны до каких-либо выбросов. Я .onNext() две вещи: за А следует Б.
Два слушателя должным образом получают A, а затем B. Но третий слушатель получает B, A. Чем можно объяснить такое поведение? Это все в той же теме.
Вот пример кода (на Котлине), который воспроизводит результаты. Дайте мне знать, если вам нужна версия Java:
@Test
fun `rxjava test`() {
val eventHistory1 = ArrayList<String>()
val eventHistory2 = ArrayList<String>()
val eventHistory3 = ArrayList<String>()
val behaviorSubject = BehaviorSubject.create<String>()
behaviorSubject.subscribe {
eventHistory1.add(it)
}
behaviorSubject.subscribe {
eventHistory2.add(it)
if (it == "A") behaviorSubject.onNext("B")
}
behaviorSubject.subscribe {
eventHistory3.add(it)
}
behaviorSubject.onNext("A")
println(eventHistory1)
println(eventHistory2)
println(eventHistory3)
assert(eventHistory1 == eventHistory2)
assert(eventHistory2 == eventHistory3)
}
А вот и результат теста:
[A, B]
[A, B]
[B, A]
Сделанный! Я должен был упомянуть ранее, но я также вызываю .onNext() в одном из блоков подписки слушателя. Что-то мне подсказывает, что это основная причина проблемы, но я не понимаю почему. Технически все три слушателя подписаны до этого .onNext(), но один из слушателей получает элементы в обратном порядке — и так получилось, что это слушатель, подписавшийся последним.
Еще одна интересная вещь: если я задержу .onNext() в вышеупомянутом блоке подписки, я могу получить согласованные результаты для всех трех слушателей.
Субъекты не являются повторно входящими, поэтому вызов onNext для того же субъекта, который в настоящее время обслуживает onNexts, является неопределенным поведением. javadoc предупреждает об этом случае:
Вызов onNext(Object), onError(Throwable) и onComplete() должен быть сериализован (вызывается из одного потока или вызывается неперекрывающимся образом из разных потоков через внешние средства сериализации). Метод Subject.toSerialized(), доступный для всех субъектов, обеспечивает такую сериализацию, а также защищает от повторного входа (т. е. когда нижестоящий наблюдатель, использующий этот субъект, также хочет рекурсивно вызвать onNext(Object) для этого субъекта).
В вашем конкретном случае сигнализация «B» происходит сначала для третьего наблюдателя, когда он собирался подать ему сигнал «A», следовательно, поменявшийся порядок.
Используйте toSerialized на тему, чтобы этого не произошло.
val behaviorSubject = BehaviorSubject.create<String>().toSerialized()
Невозможно сказать, не видя часть вашего кода. Пожалуйста, опубликуйте фрагмент кода, демонстрирующий вашу настройку и то, как вы определяете получение A или B.