у меня два вызова API 1.) Получение профиля 2.) Загрузка каналов
Мне нужно загрузить кеш, если он есть, иначе перейдите в сеть.
если кеш присутствует, загрузите его, пока сеть также вызывается в фоновом режиме
с использованием CONCAT op.
например. concat(feedlocal, feednetwork)
и как только будет получен сетевой вызов, обновите представление корзины с помощью DiffUtils (android)
Аналогично: concat(profilelocal, profilenetwork)
ПРОБЛЕМА:
я использую zip (c1, c2) (извинения, если я неправильно использую) но проблема в том, что сетевой ответ никогда не отвечает через zip op.
Мне нужно сначала показать кеш и обновить пользовательский интерфейс при получении сетевого вызова и сделать его параллельным (Feed + Profile)
Обновлено:
private Observable<List<FeedProfileResponse>> getProfileObservable(){
//local
Observable<List<FeedProfileResponse>> local = Observable.fromCallable(() ->
dataManager.getCacheValueOf(DB_FEED_PROFILE_LIST) != null ?
new Gson().fromJson(dataManager.getCacheValueOf(DB_FEED_PROFILE_LIST)
, new TypeToken<List<FeedProfileResponse>>() {
}.getType()) :
Collections.<FeedProfileResponse>emptyList())
.subscribeOn(schedulerProvider.computation())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(Throwable::printStackTrace)
.onErrorResumeNext(Observable.empty());
//network
Observable<List<FeedProfileResponse>> network = apiHelper.fetchFeedProfile(dataManager.getCurrentUserId().intValue()
, mProfileCurrentPage)
.map(responses -> {
//Cache Updates
Observable.create(subscriber -> {
updateCacheDao(dataManager.getCacheObj(DB_FEED_PROFILE_LIST),
new Gson().toJson(responses), DB_FEED_PROFILE_LIST,
NetworkUtils.getCurrentTimeStamp());
subscriber.onComplete();
})
.subscribeOn(schedulerProvider.computation())
.subscribe();
return responses;
})
.subscribeOn(schedulerProvider.io())
.doOnError(Throwable::printStackTrace)
.onErrorResumeNext(Observable.empty());
CacheData cacheObj = dataManager.getCacheObj(DB_FEED_PROFILE_LIST);
return cacheObj != null ? !NetworkUtils.hasTimestampExpired(cacheObj.getUpdatedAt())
&& cacheObj.getUpdatedAt() > dataManager.getAppTimestamp() ?
local.observeOn(AndroidSchedulers.mainThread()) :
Observable.concat(local, network).observeOn(AndroidSchedulers.mainThread()) : network.observeOn(AndroidSchedulers.mainThread());
}
private Observable<List<FeedReviewResponse>> getReviewObservable() {
//local
Observable<List<FeedReviewResponse>> local = Observable.fromCallable(() ->
dataManager.getCacheValueOf(DB_FEED_REVIEW_LIST) != null ?
new Gson().fromJson(dataManager.getCacheValueOf(DB_FEED_REVIEW_LIST)
, new TypeToken<List<FeedReviewResponse>>() {
}.getType()) :
Collections.<FeedReviewResponse>emptyList())
.subscribeOn(schedulerProvider.computation())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(Throwable::printStackTrace)
.onErrorResumeNext(Observable.empty());
//network
Observable<List<FeedReviewResponse>> network = apiHelper.fetchFeedReviews(dataManager.getCurrentUserId().intValue(),
AppConstants.FeedReviewConstants.KEY_API_PARAM_1, AppConstants.FeedReviewConstants.KEY_API_PARAM_2,
mCurrentPage)
.map(responses -> {
isFeedReviewLastPage = !(responses.size() == AppConstants.FeedReviewConstants.PAGE_SIZE);
//Cache Updates
Observable.create(subscriber -> {
updateCacheDao(dataManager.getCacheObj(DB_FEED_REVIEW_LIST),
new Gson().toJson(responses), DB_FEED_REVIEW_LIST,
NetworkUtils.getCurrentTimeStamp());
subscriber.onComplete();
})
.subscribeOn(schedulerProvider.computation())
.subscribe();
return responses;
})
.subscribeOn(schedulerProvider.io())
.doOnError(Throwable::printStackTrace)
.onErrorResumeNext(Observable.empty());
CacheData cacheObj = dataManager.getCacheObj(DB_FEED_REVIEW_LIST);
return cacheObj != null ? !NetworkUtils.hasTimestampExpired(cacheObj.getUpdatedAt())
&& cacheObj.getUpdatedAt() > dataManager.getAppTimestamp() ?
local.observeOn(AndroidSchedulers.mainThread()) :
Observable.concat(local, network).observeOn(AndroidSchedulers.mainThread()) : network.observeOn(AndroidSchedulers.mainThread());
}
private void zipProfileAndReview(Observable<List<FeedReviewResponse>> reviewObservable,
Observable<List<FeedProfileResponse>> profileObservable){
compositeDisposable.add(Observable.zip(reviewObservable, profileObservable,
ProfileAndReview::new)
.doOnNext(profileAndReview -> {
getMvpView().setRefreshing(false);
})
.subscribeOn(schedulerProvider.computation())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(Observable.empty())
.subscribeOn(schedulerProvider.io())
.subscribe(profileAndReview -> {
getMvpView().setFeedReviewListData(profileAndReview.getFeedReviewResponses());
getMvpView().setFeedProfileListData(profileAndReview.getFeedProfileResponses());
isFeedReviewLastPage = !(profileAndReview.getFeedReviewResponses().size() == AppConstants.FeedReviewConstants.PAGE_SIZE);
isFeedProfileLastPage = !(profileAndReview.getFeedProfileResponses().size() == AppConstants.FeedProfileConstants.PAGE_SIZE);
}, Throwable::printStackTrace));
}
сетевой вызов получен, но поскольку ответ диска получен сравнительно быстро, поэтому ZIP op принимает ответ диска. и сетевой вызов остается как есть. DiffUtil работает хорошо, тестировал отдельно.
getProfileObservable() и getReviewObservable() включают в себя два тернарных оператора, включенных в оператор return, это сложно и вообще не читается. Вы уверены, что Observable.concat(local, network) вызывается во время ваших тестов?
Также несколько замечаний:
Вы должны использовать combineLatest вместо zip() в zipProfileAndReview. Чтобы все значения обрабатывались zip(), у вас должно быть одинаковое количество значений для каждого из его источников. Если одно полученное значение является недействительным кеш-памятью или сбой сетевого вызова, будет использоваться только первое значение другого источника. Это может быть даже источником вашей проблемы.
Вам лучше использовать doOnNext() вместо карты для части //Cache Updates, поскольку вы не преобразуете (отображаете) значение, а просто выполняете побочные эффекты.
Вам не нужно точно указывать планировщики на всем пути. Просто запишите один раз .subscribeOn(schedulerProvider.io()).observeOn(AndroidSchedulers.mainThread()) в zipProfileAndReview().
Приносим извинения за сложный код и благодарим вас за обратную связь. Я внес изменения в общие замечания. И я использовал Concat op. отдельно, и он работает нормально. когда я добавляю ZIP op. ZIP op, обработайте последнее из reviewObservable и profileObservable (либо я буду кешем, либо сетевыми данными). Я хочу сохранить zip живым, чтобы он также мог получать сетевые данные Observable.concat (локальные, сетевые).
я подумываю использовать Concat op. только и забудьте про ZIP op. Спасибо за поддержку.
Попробуйте заменить zip () на combLatest (), как было объяснено для начала. Затем, если проблема все еще существует, разделите оператор возврата getProfileObservable () на несколько небольших функций и проверьте их одну за другой.
Дайте нам больше кода для расследования. Также вы уверены, что сетевой вызов никогда не принимается? Может быть, что-то пошло не так с частью DiffUtils.