RxJava2 объединяет наблюдаемые разных типов

Я немного новичок в RxJava. Я разбираюсь в этом, но есть один сценарий, который меня не совсем устраивает. Скажем, у меня есть пара Observables разных типов, которые предназначены для совместной работы. Например:

val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>

Итак, мне нужно наблюдать их как пару, чтобы наблюдатель отправлялся, когда либо list или indexIntoList получает обновление. Сейчас мое решение выглядит примерно так:

var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()

init {
    list.subscribe(object: Observer<List<MyClass>>{
        .
        .
        .
        override fun onNext(t: List<MyClass>)
        {
             FunctionOnMyClass(t, lastIndexUsed)
        }
    })

    indexIntoList.subscribe(object: Observer<Int>{
        .
        .
        .
        override fun onNext(t: Int)
        {
            FunctionOnMyClass(lastListUsed, t)
        }
    })
}

fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
    synchronized(syncObject)
    {
        lastListUsed = list
        lastIndexUsed = index
        .
        .
        .
    }
}

У меня возникла мысль сделать что-то вроде этого:

var lastMyClass = DefaultMyClass()

list.doOnNext{listVersion ->
    indexIntoMap.map{ index ->
          if (index in listVersion.indices)
              listVersion[index]
          else
              lastMyClass
     }.subscribe(object: Observer<MyClass>{
        .
        .
        .
        override fun onNext(t: MyClass)
        {
            lastMyClass = t
            .
            .
            .
        }
     })
}

Но если я правильно понимаю, чтобы использовать этот метод, мне пришлось бы размещать Disposable во внутреннем Observable каждый раз, когда обновляется внешний Observable. Я бы предпочел не делать этого. Есть ли предпочтительный способ справиться с этим сценарием? Должен ли я быть готов избавиться от Disposable при каждом обновлении внешнего Observable?

Эй, я не так хорошо знаком с Kotlin, и я тоже своего рода новичок в RxJava, но я использовал его раньше, чтобы связать несколько наблюдаемых, не уверен, что это то, что вы искали, но вы можете связать выходы наблюдаемых, вот что ты хочешь?

Mr.O 08.08.2018 17:45

Если вы хотите соединить их, вы можете использовать оператор zip, например это

Mr.O 08.08.2018 17:51
1
2
2 247
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Думаю, вы ищете оператора combineLastest:

when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

Вот пример реализации:

val myListObservable = ...
val myIndexObservable = ...

disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
    list to index
})

Затем вы можете работать напрямую с последним списком и последним индексом, создаваемым каждым наблюдаемым объектом как Pair<List<YourType>,Int>.


В случае, если вам нужно подождать, пока каждый наблюдаемый объект испускает элемент для соединения друг с другом, вы должны использовать zip

Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Как вы можете видеть на диаграмме, вторая наблюдаемая, наконец, испускает C и D. Но первому наблюдаемому требуется время, чтобы испустить 3, а затем 4. Результирующая пара будет ждать, пока другой наблюдаемый не испустит элемент для создания пары.

Реализация такая же, как и выше, с заменой combineLastest на zip.

Очень важно отметить, что ZIP будет генерировать события поочередно A, B, A, B, A, B, если в какой-то момент есть несколько событий из одного источника и нет событий из другого, ZIP будет ждать, пока другой источник не сгенерирует событие.

Andre 08.08.2018 18:01

Огромное спасибо! Это было очень полезно.

KairisCharm 08.08.2018 18:27

Еще один быстрый вопрос. Что мне делать, если я хочу выдать только одно из значений, отличное от того, что было в прошлый раз?

KairisCharm 08.08.2018 18:42

NVM, только что нашел. ОтдельныйUntilChanged (). Понятно.

KairisCharm 08.08.2018 18:56

если вы хотите прослушать самые последние события от обоих наблюдаемых, взгляните на оператор combineLatest: http://rxmarbles.com/#combineLatest

Он генерирует каждый раз, когда один из наблюдаемых выдает новое значение, НО он запускается, как только каждый из источников генерирует хотя бы одно событие.

val result = Observable.combineLatest<String, Int, String>(sourceObservable1, sourceObservable2) { s, integer -> s + ": " + integer }

Спасибо за ответ. Это было полезно, но я выбираю предыдущий, потому что это более подробный и полный ответ.

KairisCharm 08.08.2018 18:26

Другие вопросы по теме