Я немного новичок в RxJava. Я разбираюсь в этом, но есть один сценарий, который меня не совсем устраивает. Скажем, у меня есть пара Observables разных типов, которые предназначены для совместной работы. Например:
val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>
Итак, мне нужно наблюдать их как пару, чтобы наблюдатель отправлялся, когда либо list или indexIntoList получает обновление. Сейчас мое решение выглядит примерно так:
var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()
init {
list.subscribe(object: Observer<List<MyClass>>{
.
.
.
override fun onNext(t: List<MyClass>)
{
FunctionOnMyClass(t, lastIndexUsed)
}
})
indexIntoList.subscribe(object: Observer<Int>{
.
.
.
override fun onNext(t: Int)
{
FunctionOnMyClass(lastListUsed, t)
}
})
}
fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
synchronized(syncObject)
{
lastListUsed = list
lastIndexUsed = index
.
.
.
}
}
У меня возникла мысль сделать что-то вроде этого:
var lastMyClass = DefaultMyClass()
list.doOnNext{listVersion ->
indexIntoMap.map{ index ->
if (index in listVersion.indices)
listVersion[index]
else
lastMyClass
}.subscribe(object: Observer<MyClass>{
.
.
.
override fun onNext(t: MyClass)
{
lastMyClass = t
.
.
.
}
})
}
Но если я правильно понимаю, чтобы использовать этот метод, мне пришлось бы размещать Disposable во внутреннем Observable каждый раз, когда обновляется внешний Observable. Я бы предпочел не делать этого. Есть ли предпочтительный способ справиться с этим сценарием? Должен ли я быть готов избавиться от Disposable при каждом обновлении внешнего Observable?
Если вы хотите соединить их, вы можете использовать оператор zip, например это
Думаю, вы ищете оператора combineLastest:
when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
Вот пример реализации:
val myListObservable = ...
val myIndexObservable = ...
disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
list to index
})
Затем вы можете работать напрямую с последним списком и последним индексом, создаваемым каждым наблюдаемым объектом как Pair<List<YourType>,Int>.
В случае, если вам нужно подождать, пока каждый наблюдаемый объект испускает элемент для соединения друг с другом, вы должны использовать zip
Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Как вы можете видеть на диаграмме, вторая наблюдаемая, наконец, испускает C и D. Но первому наблюдаемому требуется время, чтобы испустить 3, а затем 4. Результирующая пара будет ждать, пока другой наблюдаемый не испустит элемент для создания пары.
Реализация такая же, как и выше, с заменой combineLastest на zip.
Очень важно отметить, что ZIP будет генерировать события поочередно A, B, A, B, A, B, если в какой-то момент есть несколько событий из одного источника и нет событий из другого, ZIP будет ждать, пока другой источник не сгенерирует событие.
Огромное спасибо! Это было очень полезно.
Еще один быстрый вопрос. Что мне делать, если я хочу выдать только одно из значений, отличное от того, что было в прошлый раз?
NVM, только что нашел. ОтдельныйUntilChanged (). Понятно.
если вы хотите прослушать самые последние события от обоих наблюдаемых, взгляните на оператор combineLatest: http://rxmarbles.com/#combineLatest
Он генерирует каждый раз, когда один из наблюдаемых выдает новое значение, НО он запускается, как только каждый из источников генерирует хотя бы одно событие.
val result = Observable.combineLatest<String, Int, String>(sourceObservable1, sourceObservable2) { s, integer -> s + ": " + integer }
Спасибо за ответ. Это было полезно, но я выбираю предыдущий, потому что это более подробный и полный ответ.
Эй, я не так хорошо знаком с Kotlin, и я тоже своего рода новичок в RxJava, но я использовал его раньше, чтобы связать несколько наблюдаемых, не уверен, что это то, что вы искали, но вы можете связать выходы наблюдаемых, вот что ты хочешь?