Как объединить вызовы RxJava Single и Completable Retrofit в приложении для Android

В моем текущем приложении для Android используются Retrofit и RxJava для организации сетевых вызовов.

Я смоделировал свои HTTP-запросы GET как Single<Response<String>> и POST как Completable.

Требуемая последовательность вызовов следующая:

Последовательно вызовите GET (1), GET (2), GET (3)

Параллельный вызов POST (1), POST (2)

Когда и POST (1), и POST (2) завершились успешно, вызовите GET (4).

У меня есть частичное решение. Я закодировал звонки для первых трех GET (ов) за которыми следуют вызовы POST

Мой код выглядит примерно так: -

Single.concat(getRequests())
                .subscribeOn(Schedulers.single())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        manageExecutions(combineExecutions());
                    }
                })
                .subscribe();

    /**
     * @return
     */
    private static Iterable<Single<Response<String>>> getRequests() {
        final API_CALL_GET[] apiCalls = API_CALL_GET.values();
        final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_GET apiCall : apiCalls) {
            requests.add(apiCall.request());
        }

        return requests;
    }

public enum API_CALL_GET {

    GET_ONE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getOne(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataOne)
                .doOnError(error -> ever(error));
        }
    }, GET_TWO {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getTwo(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataTwo)
                .doOnError(error -> ever(error));
        }
    },
    GET_THREE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getThree(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataThree)
                .doOnError(error -> ever(error));
        }
    };

    public abstract Single<Response<String>> request();

}


    private static Action manageExecutions(final List<Completable> completables) {

        return new Action() {
            @Override
            public void run() throws Exception {
                Completable
                .concat(completables)
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        accumulateAmounts();
                    }
                })
                .subscribe();
            }
        };
    }


    /**
     * @return
     */
    private static List<Completable> combineExecutions() {
        final API_CALL_POST[] apiCalls = API_CALL_POST.values();
        final List<Completable> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_POST apiCall : apiCalls) {
            requests.addAll(apiCall.requests());
        }

        return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
    }

public enum API_CALL_POST {

    POST_ONE {
        @Override
        public List<Completable> requests() {
            return NetworkController.postRecommenderExecutions();
        }
    },
    POST_TWO {
        @Override
        public List<Completable> requests() {
            return NetworkController.postSavedSearcheExecutions();
        }
    };

    public abstract List<Completable> requests();

}


    public static List<Completable> postONE() {
        final List<Completable> completables = new ArrayList<>();

        final List<OneDO> oneDOS = fetchOnes();

        for (final OneDO oneDO : oneDOS) {
            completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }




    public static List<Completable> postTWO() {
        final List<Completable> completables = new ArrayList<>();

        final List<TwoDO> twoDOS = fetchTwos();

        for (final TwoDO twoDO : twoDOS) {
            completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }

У меня проблемы с правильным соединением моих звонков

например Я думал, что смогу разработать решение, похожее на этот псевдокод.

Single.concat (GET_1 ... GET_N) .onComplete (POST_1 ... POST_N) .onComplete (GET_LAST)

однако мое текущее частичное решение вызывает только первую группу GET (ов), за которой следуют POST (ы), а вызовы GET и POST не связаны

Я не вижу, как создать цепочку вызовов, поддерживающую мой вариант использования.

Можно ли объединить Single -> Completable -> Single в цепной вызов?

ОБНОВИТЬ

Основываясь на ответе Даниила, я пришел к этому решению: -

 Single.concat(getRequests())
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
                .ignoreElements()
                .andThen(Completable.merge(combineExecutions()))
                .doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
                .doOnComplete(() -> Controller.accumulateTotals())
                .subscribe();

Почему вы не можете использовать операторы, упомянутые в ответе ниже. Если вы хотите связать наблюдаемые объекты и использовать результат каждого из следующих наблюдаемых, существует множество операторов. Например: map (result => используйте его в другой функции. Это то, что вы ищете?

Mr.O 12.08.2018 21:49

Я не могу ничего попробовать, пока не вернусь к работе в конце месяца. Также у меня нет требований использовать результаты каких-либо одиночных или завершенных таблиц, все, что мне нужно знать, это то, что каждый из них завершил Хорошо

Hector 12.08.2018 21:58
9
2
5 386
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Объединение различных типов в цепочку может происходить либо путем их преобразования в общий реактивный тип (например, Observable) и конкатенации, либо с использованием одного из подходы к продолжению через flatMapX и andThen:

 someSingle
 .flatMapCompletable(result1 -> {
     sideEffect(result1);
     return someCompletable;
 })
 .andThen(Single.defer(() -> {
     sideEffectAfterCompletable();
     return someOtherSingle;
 }))
 ...

Я искал решение, которое могло бы принимать произвольное количество Single (s) и Completable (s); не будет ли полагаться на «andThen», означать, что мне придется сопоставить количество одиночных и завершаемых элементов с одинаковым количеством вхождений «andThen»?

Hector 12.08.2018 11:59

У одиночек есть ценности, с которыми вы, вероятно, что-то сделаете. Их смешивание требует дополнительной работы, особенно если они разных типов. Добавление Completables в микс также требует решения о том, что делать с их побочными эффектами, если таковые имеются.

akarnokd 12.08.2018 13:19

В этом случае все мои одиночные игры возвращают полезную нагрузку Json, которую я «создаю» в моей локальной базе данных Realm. Все мои дополнения публикуют сообщение и не получают ответа, поэтому на высоком уровне мои многократные одиночные игры и дополнения идентичны по-своему.

Hector 12.08.2018 15:00
Ответ принят как подходящий

В котлине это выглядело бы примерно так:

fun generateGetRequests(): List<Single<Response<String>>> {
    return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}

fun generatePostRequests(): List<Completable> {
    return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}

fun doSomethingWithResponses(responses: Array<Any>) {
    // Do Something, like save to db
}

fun runRequests() {
    Single.zip(generateGetRequests(), { responses ->
        doSomethingWithResponses(responses)
    }).ignoreElements()
        .andThen(Completable.merge(generatePostRequests()))
        .subscribeOn(Schedulers.io())
        .subscribe()
}

Это было близко к тому, что мне требовалось. Мне пришлось заменить Single.zip () на Single.concat (), чтобы все мои GET (-ы) выполнялись по желанию. Когда я использовал zip (), любой запрос GET, который был последним в списке, никогда не выполнялся. Мне также пришлось изменить .toCompletable () на .ignoreElements (), поскольку toCompletable устарел.

Hector 15.08.2018 09:57

Другие вопросы по теме