В моем текущем приложении для Android используются Retrofit
и RxJava
для организации сетевых вызовов.
Я смоделировал свои HTTP-запросы GET как Single<Response<String>>
и POST как Completable
.
Требуемая последовательность вызовов следующая:
Последовательно вызовите GET (1), GET (2), GET (3)
Параллельный вызов POST (1), POST (2)
Когда и POST (1), и POST (2) завершились успешно, вызовите GET (4).
У меня есть частичное решение. Я закодировал звонки для первых трех GET (ов) за которыми следуют вызовы POST
Мой код выглядит примерно так: -
Single.concat(getRequests())
.subscribeOn(Schedulers.single())
.doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
.doFinally(new Action() {
@Override
public void run() throws Exception {
manageExecutions(combineExecutions());
}
})
.subscribe();
/**
* @return
*/
private static Iterable<Single<Response<String>>> getRequests() {
final API_CALL_GET[] apiCalls = API_CALL_GET.values();
final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);
for (final API_CALL_GET apiCall : apiCalls) {
requests.add(apiCall.request());
}
return requests;
}
public enum API_CALL_GET {
GET_ONE {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getOne(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataOne)
.doOnError(error -> ever(error));
}
}, GET_TWO {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getTwo(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataTwo)
.doOnError(error -> ever(error));
}
},
GET_THREE {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getThree(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataThree)
.doOnError(error -> ever(error));
}
};
public abstract Single<Response<String>> request();
}
private static Action manageExecutions(final List<Completable> completables) {
return new Action() {
@Override
public void run() throws Exception {
Completable
.concat(completables)
.subscribeOn(Schedulers.io())
.doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
accumulateAmounts();
}
})
.subscribe();
}
};
}
/**
* @return
*/
private static List<Completable> combineExecutions() {
final API_CALL_POST[] apiCalls = API_CALL_POST.values();
final List<Completable> requests = new ArrayList<>(apiCalls.length);
for (final API_CALL_POST apiCall : apiCalls) {
requests.addAll(apiCall.requests());
}
return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
}
public enum API_CALL_POST {
POST_ONE {
@Override
public List<Completable> requests() {
return NetworkController.postRecommenderExecutions();
}
},
POST_TWO {
@Override
public List<Completable> requests() {
return NetworkController.postSavedSearcheExecutions();
}
};
public abstract List<Completable> requests();
}
public static List<Completable> postONE() {
final List<Completable> completables = new ArrayList<>();
final List<OneDO> oneDOS = fetchOnes();
for (final OneDO oneDO : oneDOS) {
completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(final Throwable throwable) throws Exception {
Log.e(TAG, "accept: ", throwable);
}
}));
}
return completables;
}
public static List<Completable> postTWO() {
final List<Completable> completables = new ArrayList<>();
final List<TwoDO> twoDOS = fetchTwos();
for (final TwoDO twoDO : twoDOS) {
completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(final Throwable throwable) throws Exception {
Log.e(TAG, "accept: ", throwable);
}
}));
}
return completables;
}
У меня проблемы с правильным соединением моих звонков
например Я думал, что смогу разработать решение, похожее на этот псевдокод.
Single.concat (GET_1 ... GET_N) .onComplete (POST_1 ... POST_N) .onComplete (GET_LAST)
однако мое текущее частичное решение вызывает только первую группу GET (ов), за которой следуют POST (ы), а вызовы GET и POST не связаны
Я не вижу, как создать цепочку вызовов, поддерживающую мой вариант использования.
Можно ли объединить Single
-> Completable
-> Single
в цепной вызов?
ОБНОВИТЬ
Основываясь на ответе Даниила, я пришел к этому решению: -
Single.concat(getRequests())
.subscribeOn(Schedulers.io())
.doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
.ignoreElements()
.andThen(Completable.merge(combineExecutions()))
.doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
.doOnComplete(() -> Controller.accumulateTotals())
.subscribe();
Я не могу ничего попробовать, пока не вернусь к работе в конце месяца. Также у меня нет требований использовать результаты каких-либо одиночных или завершенных таблиц, все, что мне нужно знать, это то, что каждый из них завершил Хорошо
Объединение различных типов в цепочку может происходить либо путем их преобразования в общий реактивный тип (например, Observable
) и конкатенации, либо с использованием одного из подходы к продолжению через flatMapX
и andThen
:
someSingle
.flatMapCompletable(result1 -> {
sideEffect(result1);
return someCompletable;
})
.andThen(Single.defer(() -> {
sideEffectAfterCompletable();
return someOtherSingle;
}))
...
Я искал решение, которое могло бы принимать произвольное количество Single (s) и Completable (s); не будет ли полагаться на «andThen», означать, что мне придется сопоставить количество одиночных и завершаемых элементов с одинаковым количеством вхождений «andThen»?
У одиночек есть ценности, с которыми вы, вероятно, что-то сделаете. Их смешивание требует дополнительной работы, особенно если они разных типов. Добавление Completables в микс также требует решения о том, что делать с их побочными эффектами, если таковые имеются.
В этом случае все мои одиночные игры возвращают полезную нагрузку Json, которую я «создаю» в моей локальной базе данных Realm. Все мои дополнения публикуют сообщение и не получают ответа, поэтому на высоком уровне мои многократные одиночные игры и дополнения идентичны по-своему.
В котлине это выглядело бы примерно так:
fun generateGetRequests(): List<Single<Response<String>>> {
return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}
fun generatePostRequests(): List<Completable> {
return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}
fun doSomethingWithResponses(responses: Array<Any>) {
// Do Something, like save to db
}
fun runRequests() {
Single.zip(generateGetRequests(), { responses ->
doSomethingWithResponses(responses)
}).ignoreElements()
.andThen(Completable.merge(generatePostRequests()))
.subscribeOn(Schedulers.io())
.subscribe()
}
Это было близко к тому, что мне требовалось. Мне пришлось заменить Single.zip () на Single.concat (), чтобы все мои GET (-ы) выполнялись по желанию. Когда я использовал zip (), любой запрос GET, который был последним в списке, никогда не выполнялся. Мне также пришлось изменить .toCompletable () на .ignoreElements (), поскольку toCompletable устарел.
Почему вы не можете использовать операторы, упомянутые в ответе ниже. Если вы хотите связать наблюдаемые объекты и использовать результат каждого из следующих наблюдаемых, существует множество операторов. Например: map (result => используйте его в другой функции. Это то, что вы ищете?