FlatMapCompletable не продолжает цепочку Rx, но плоская карта с комплементарием с «andThen (Observable.just (true)» работает?

Я пытаюсь связать завершение с моей цепочкой Rx, и когда я это делаю, цепочка никогда не заканчивается в onError или onComplete.

Когда я пошагово выполняю код, мой завершаемый код выполняется. Я даже могу добавить ведение журнала и посмотреть, как он входит в собственный doOnComplete().

Ниже будет записано «Я завершено», но не будет отображаться ошибка или полный обратный вызов.

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMapCompletable { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .doOnComplete {
                                Log.i("I COMPLETED", "I COMPLETED")
                            }
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                            //do error
                        },
                        onComplete = {
                           //do success
                        }
                ).addTo(disposable)

если я вместо этого использую flatMap и использую andThen для возврата логического наблюдаемого, это будет работать

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMap { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .doOnComplete {
                                Log.i("I COMPLETED", "I COMPLETED")
                            }.andThen(Observable.just(true))
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                         //do error
                        },
                        onNext = {
                           //do next
                        }
                ).addTo(disposable)

Я пытался добавить «andThen» в версию flatMapCompletable и вызвать Completable.complete(), но это тоже не работает?

Я не могу понять, почему мой completable завершается, но отказывается работать с flatMapCompletable?

Обновлено: это обновление моей полной попытки, которая не работает

Примечание userRoutingService.disableRule(accountUid, ruleId) — модифицированный интерфейс.

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMapCompletable { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .andThen(Completable.complete())
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                          Log.i("TAG", "ERROR")
                        },
                        onComplete = {
                            Log.i("TAG", "COMPLETE")
                        }
                ).addTo(disposable)

 override fun disableRule(accountUid: String, ruleId: String): Completable {
        return activeStateToggler(userRoutingSourceApi.disableRule(accountUid, ruleId),
                ruleId,
                false)
    }



override fun disableRule(accountUid: String, ruleId: String): Completable {
        return userRoutingService.disableRule(accountUid, ruleId)
                .doOnError { error ->
                    authenticationValidator.handleAuthenticationExceptions(error)
                }
    }

    private fun activeStateToggler(completable: Completable,
                                   ruleId: String,
                                   stateOnSuccess: Boolean
    ): Completable {
        return completable
                .doOnSubscribe {
                    stateTogglingInProgress.add(ruleId)
                }
                .doOnComplete {
                    stateTogglingInProgress.remove(ruleId)
                    getLocalUserRule(ruleId)?.active = stateOnSuccess
                    stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Success)
                }
                .doOnError {
                    stateTogglingInProgress.remove(ruleId)
                    stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Error(
                            it))
                }
    }
4
0
3 251
2

Ответы 2

При использовании flatMapCompletable нужно вернуть Completable.complete() самостоятельно.

редактировать:

 profileRepo.getLocalProfileIfAvailableElseRemote()
     .flatMap { profile ->
         userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
             .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") } }
     .flatMapCompletable { () -> { Completable.complete() } }
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribeBy(
         onError = { error ->
             //do error
         },
         onNext = {
             //do next
         }
    ).addTo(disposable)

изменить 2: так как disposableRule является Completable

 profileRepo.getLocalProfileIfAvailableElseRemote()
     .flatMapCompletable { profile ->
         userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
             .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") }
             .andThen(Completable.complete().doOnCompleted { Log.i("comp2", "comp2")) }
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribeBy(
         onError = { error ->
             //do error
         },
         onNext = {
             //do next
         }
    ).addTo(disposable)

редактировать 3: рабочий образец

Observable.just(1)
    .flatMapCompletable { profile ->
        Completable.complete()
            .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") }
            .andThen(Completable.complete().doOnComplete { Log.i("I COMPLETED", "I COMPLETED 2") })}
    .subscribeBy(
        onError = { error ->
        },
        onComplete = {
            Log.d("I COMPLETED", "I COMPLETED 3")
        })

Это не сработало? Это не завершено. .flatMapCompletable { профиль -> userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id).andThen(Completable.complete()) }

Ben987654 18.03.2019 18:39

Редактирование не компилируется. Это дает ошибку, потому что правило отключения является завершаемым, но ожидает наблюдаемое.

Ben987654 18.03.2019 20:24

Edit2 вернулся к той же проблеме, с которой я столкнулся изначально. Все, что находится после вызова disableRule и его .doOnComplete, не выполняется, включая andThen. Подписка onError или onComplete никогда не достигается =(

Ben987654 19.03.2019 00:26

опубликуйте весь код. Это работает для меня. Убедитесь, что вы используете .andThen(), а не .andThen { }

mbmc 19.03.2019 00:39

То, что вы разместили без использования «настоящего» комплемента, работает, но не работает, когда возвращаемый результат правила отключения является комплементарным. На самом деле я не хочу никакой дополнительной логики в andThen, я просто пытаюсь понять, как заставить его читать его как завершенный. Я обновил свой вопрос с полной логикой правила отключения, и все это выполняется правильно.

Ben987654 19.03.2019 01:04

Вот что делает flatMapCompletable:

Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.

При использовании flatMapCompletable возвращаемый Completable будет ожидать события терминала Observable восходящего потока (onComplete).

При использовании flatMapCompletable используйте его только в том случае, если вы уверены, что все в цепочке завершается.

В вашем случае это не работает, потому что ваш исходный Observable горячий и никогда не завершается.

Другие вопросы по теме