CompletableTransformer не применяет подписку и наблюдение к восходящему потоку

Я создаю первое автономное приложение в качестве своего побочного проекта, используя rxKotlin, MVVM + Clean Architecture, и вчера я решил отказаться от шаблонной подписки и наблюдения, используя трансформаторы. Я быстро понял, что функция применения трансформаторов игнорируется.

Вот код моего базового варианта использования (интерактора):

abstract class CompletableUseCase(private val transformer: CompletableTransformer) {

    abstract fun createCompletable(data: Map<String, Any>? = null) : Completable

    fun completable(data: Map<String, Any>? = null) : Completable {
        return createCompletable(data).compose(transformer)
    }
}

А вот реализация конкретного интерактора:

class SaveRouteInteractor(
    transformer: CompletableTransformer,
    private val routeRepository: RouteRepository
) : CompletableUseCase(transformer) {

    companion object {
        private const val PARAM_ROUTE = "param_route"
    }

    fun saveRoute(route: Route) : Completable {
        val data = HashMap<String, Route>()
        data[PARAM_ROUTE] = route
        return completable(data)
    }

    override fun createCompletable(data: Map<String, Any>?): Completable {
        val routeEntity = data?.get(PARAM_ROUTE)

        routeEntity?.let {
            return routeRepository.saveRoute(routeEntity as Route)
        } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
    }
}

Мой пользовательский преобразователь, который передается конструктору SaveRouteInteractor:

class IOCompletableTransformer(private val mainThreadScheduler: Scheduler) : CompletableTransformer {

    override fun apply(upstream: Completable): CompletableSource {
        return upstream.subscribeOn(Schedulers.io()).observeOn(mainThreadScheduler)
    }
}

И реализация метода RouteRepository:

override fun saveRoute(route: Route): Completable {
        return localRouteSource.saveRoute(route)
            .flatMap { localID ->
                route.routeId = localID
                remoteRouteSource.saveRoute(route)
            }
            .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }
    }

Я использую Room в качестве своего локального источника, поэтому после вызова интерактора сохранения в моей ViewModel я получаю исключение IlligalStateException, сообщающее мне, что мне не разрешен доступ к базе данных в основном потоке.

Может быть, я что-то упускаю, но кажется, что функция преобразования игнорируется. Я отладил этот метод, и он применяет подписку и наблюдение к восходящему потоку.

Спасибо за помощь заранее, Шаг!

0
0
127
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Трудно сказать вам, в чем проблема, потому что код неполный.

Например здесь:

    return localRouteSource.saveRoute(route)
        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }
        .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }

Я предполагаю, что localRouteSource.saveRoute() использует интерактор, который вы нам показываете, но неясно, как реализованы remoteRouteSource.saveRoute() или localRouteSource.updateRouteID().

они также должны быть подписаны на поток ввода-вывода.

Как правило, вы должны переключать нить, когда ЗНАЕТЕ, что вам это нужно.

Другими словами, вы должны использовать subscribeOn() в местах, где вы знаете, что выполняете ввод-вывод как можно ближе к реальной работе. Вместо этого следует использовать ObserveOn, когда вы знаете, что вам нужно получить эти результаты в потоке пользовательского интерфейса и что вы можете получить их в каком-то другом потоке.

в вашем примере абсолютно нет необходимости продолжать использовать observeOn(MAIN_THREAD), единственный раз, когда вам это нужно (я полагаю), это когда вы хотите показать результат.

Еще пара вещей:

Этот код

override fun createCompletable(data: Map<String, Any>?): Completable {
    val routeEntity = data?.get(PARAM_ROUTE)

    routeEntity?.let {
        return routeRepository.saveRoute(routeEntity as Route)
    } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
}

он оценивается во время вызова метода, а не при подписке на завершаемый объект.

Другими словами, он разрывает контракт Rx и вычисляет data?.get(PARAM_ROUTE) при вызове метода. Если он неизменяемый, большой разницы нет, но если он может изменить значение во время выполнения, его следует обернуть в Completable.defer { }

Наконец-то здесь

        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }

вы модифицируете что-то вне цепочки (route.routeId = localID), это называется побочным эффектом.

будьте осторожны с такими вещами, Rx построен таким образом, чтобы его было безопаснее использовать с неизменяемыми объектами.

Лично я не возражаю, если вы понимаете, что происходит и когда это может создать проблемы.

Другие вопросы по теме