У меня есть некоторые сомнения в том, как правильно использовать параллельные потоки в RxJava с приложением для Android (Kotlin). У меня есть приложение, которое читает некоторые RSS-каналы. Пользовательский запрос на обновление всех каналов с помощью этого метода:
override fun updateAllFeeds(): Completable {
return menu()
.map {
val feeds = mutableListOf<String>()
it.forEach { menuSection ->
menuSection.options.forEach { menuOption ->
menuOption.feed?.forEach { feed ->
if (!feeds.contains(feed)) feeds.add(feed)
}
}
}
feeds
}
.flatMapObservable {
// Get each item from List
Observable.fromIterable(it)
}
.flatMapCompletable { feedUrl ->
// Update each feed
rssDataStore.updateFeed(feedUrl)
}
}
Этот Completable используется здесь позже:
val task = rssRepository.updateAllFeeds().subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.scheduler)
Где threadExecutor - это ThreadPoolExecutor с максимальным размером пула = 5.
У меня была идея, что часть rssDataStore.updateFeed (feedUrl) будет выполняться параллельно из-за ThreadPoolExecutor, используемого в subscribeOn, но в Logcat я вижу, что он запускается только по одному, последовательно, а onComplete наблюдателя запускается, когда последний канал обновлен. Есть ли способ указать, что rssDataStore.updateFeed (feedUrl) должен выполняться параллельно, но получать onComplete только после завершения последнего потока?
Спасибо @akarnokd, это было то, что мне было нужно.
updateFeed(feedUrl).subscribeOn(yourScheduler).