Параллелизм в RxJava

У меня есть некоторые сомнения в том, как правильно использовать параллельные потоки в RxJava с приложением для Android (Kotlin). У меня есть приложение, которое читает некоторые RSS-каналы. Пользовательский запрос на обновление всех каналов с помощью этого метода:

override fun updateAllFeeds(): Completable {
    return menu()
            .map {
                val feeds = mutableListOf<String>()
                it.forEach { menuSection ->
                    menuSection.options.forEach { menuOption ->
                        menuOption.feed?.forEach { feed ->
                            if (!feeds.contains(feed)) feeds.add(feed)
                        }
                    }
                }
                feeds
            }
            .flatMapObservable {
                // Get each item from List
                Observable.fromIterable(it)
            }
            .flatMapCompletable { feedUrl ->
                // Update each feed
                rssDataStore.updateFeed(feedUrl)
            }
}

Этот Completable используется здесь позже:

val task = rssRepository.updateAllFeeds().subscribeOn(Schedulers.from(threadExecutor))
                .observeOn(postExecutionThread.scheduler)

Где threadExecutor - это ThreadPoolExecutor с максимальным размером пула = 5.

У меня была идея, что часть rssDataStore.updateFeed (feedUrl) будет выполняться параллельно из-за ThreadPoolExecutor, используемого в subscribeOn, но в Logcat я вижу, что он запускается только по одному, последовательно, а onComplete наблюдателя запускается, когда последний канал обновлен. Есть ли способ указать, что rssDataStore.updateFeed (feedUrl) должен выполняться параллельно, но получать onComplete только после завершения последнего потока?

Рекомендуемое чтение. Параллелизм требует, чтобы потоки индивидуально запускались в Планировщике; updateFeed(feedUrl).subscribeOn(yourScheduler).
akarnokd 09.04.2018 00:45

Спасибо @akarnokd, это было то, что мне было нужно.

allo86 09.04.2018 01:41
0
2
64
0

Другие вопросы по теме