Rxjava2 - Android (Concat + Zip) Возможно?

у меня два вызова API 1.) Получение профиля 2.) Загрузка каналов

Мне нужно загрузить кеш, если он есть, иначе перейдите в сеть.

если кеш присутствует, загрузите его, пока сеть также вызывается в фоновом режиме

с использованием CONCAT op.
например. concat(feedlocal, feednetwork)

и как только будет получен сетевой вызов, обновите представление корзины с помощью DiffUtils (android)

Аналогично: concat(profilelocal, profilenetwork)

ПРОБЛЕМА:

я использую zip (c1, c2) (извинения, если я неправильно использую) но проблема в том, что сетевой ответ никогда не отвечает через zip op.

https://dl2.pushbulletusercontent.com/giQolwcSSPZrBxwiUQoFbsQ7FMxBRr8w/4vgkDWPUoU5Pb8sQsLa0bbAURs7gKi0ncyIoluD7bqU2EeOk-FJnJudUvKjB5hwNY0bm7Yt_kl9wgZ5aC4a567PCxpUhnoNxUJWEcO6i0VY89QQDsTU3uZ

Мне нужно сначала показать кеш и обновить пользовательский интерфейс при получении сетевого вызова и сделать его параллельным (Feed + Profile)

Обновлено:

  private Observable<List<FeedProfileResponse>> getProfileObservable(){


    //local
    Observable<List<FeedProfileResponse>> local = Observable.fromCallable(() ->
            dataManager.getCacheValueOf(DB_FEED_PROFILE_LIST) != null ?
                    new Gson().fromJson(dataManager.getCacheValueOf(DB_FEED_PROFILE_LIST)
                            , new TypeToken<List<FeedProfileResponse>>() {
                            }.getType()) :
                    Collections.<FeedProfileResponse>emptyList())
            .subscribeOn(schedulerProvider.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(Throwable::printStackTrace)
            .onErrorResumeNext(Observable.empty());

    //network
    Observable<List<FeedProfileResponse>> network = apiHelper.fetchFeedProfile(dataManager.getCurrentUserId().intValue()
            , mProfileCurrentPage)
            .map(responses -> {
                //Cache Updates
                Observable.create(subscriber -> {
                    updateCacheDao(dataManager.getCacheObj(DB_FEED_PROFILE_LIST),
                            new Gson().toJson(responses), DB_FEED_PROFILE_LIST,
                            NetworkUtils.getCurrentTimeStamp());
                    subscriber.onComplete();
                })
                .subscribeOn(schedulerProvider.computation())
                .subscribe();


                return responses;
            })
            .subscribeOn(schedulerProvider.io())
            .doOnError(Throwable::printStackTrace)
            .onErrorResumeNext(Observable.empty());


    CacheData cacheObj = dataManager.getCacheObj(DB_FEED_PROFILE_LIST);
    return cacheObj != null ? !NetworkUtils.hasTimestampExpired(cacheObj.getUpdatedAt())
            && cacheObj.getUpdatedAt() > dataManager.getAppTimestamp() ?
            local.observeOn(AndroidSchedulers.mainThread()) :
            Observable.concat(local, network).observeOn(AndroidSchedulers.mainThread()) : network.observeOn(AndroidSchedulers.mainThread());

}

private Observable<List<FeedReviewResponse>> getReviewObservable() {

    //local 
    Observable<List<FeedReviewResponse>> local = Observable.fromCallable(() ->
            dataManager.getCacheValueOf(DB_FEED_REVIEW_LIST) != null ?
                    new Gson().fromJson(dataManager.getCacheValueOf(DB_FEED_REVIEW_LIST)
                            , new TypeToken<List<FeedReviewResponse>>() {
                            }.getType()) :
                    Collections.<FeedReviewResponse>emptyList())
            .subscribeOn(schedulerProvider.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(Throwable::printStackTrace)
            .onErrorResumeNext(Observable.empty());

    //network 
    Observable<List<FeedReviewResponse>> network = apiHelper.fetchFeedReviews(dataManager.getCurrentUserId().intValue(),
            AppConstants.FeedReviewConstants.KEY_API_PARAM_1, AppConstants.FeedReviewConstants.KEY_API_PARAM_2,
            mCurrentPage)
            .map(responses -> {
                isFeedReviewLastPage = !(responses.size() == AppConstants.FeedReviewConstants.PAGE_SIZE);

                //Cache Updates
                Observable.create(subscriber -> {
                    updateCacheDao(dataManager.getCacheObj(DB_FEED_REVIEW_LIST),
                            new Gson().toJson(responses), DB_FEED_REVIEW_LIST,
                            NetworkUtils.getCurrentTimeStamp());
                    subscriber.onComplete();
                })
                .subscribeOn(schedulerProvider.computation())
                .subscribe();


                return responses;
            })

            .subscribeOn(schedulerProvider.io())
            .doOnError(Throwable::printStackTrace)
            .onErrorResumeNext(Observable.empty());


    CacheData cacheObj = dataManager.getCacheObj(DB_FEED_REVIEW_LIST);
    return cacheObj != null ? !NetworkUtils.hasTimestampExpired(cacheObj.getUpdatedAt())
            && cacheObj.getUpdatedAt() > dataManager.getAppTimestamp() ?
            local.observeOn(AndroidSchedulers.mainThread()) :
            Observable.concat(local, network).observeOn(AndroidSchedulers.mainThread()) : network.observeOn(AndroidSchedulers.mainThread());


}

private void zipProfileAndReview(Observable<List<FeedReviewResponse>> reviewObservable,
                                 Observable<List<FeedProfileResponse>> profileObservable){
    compositeDisposable.add(Observable.zip(reviewObservable, profileObservable,
            ProfileAndReview::new)
            .doOnNext(profileAndReview -> {
                getMvpView().setRefreshing(false);

            })
            .subscribeOn(schedulerProvider.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext(Observable.empty())
            .subscribeOn(schedulerProvider.io())
            .subscribe(profileAndReview -> {
                getMvpView().setFeedReviewListData(profileAndReview.getFeedReviewResponses());
                getMvpView().setFeedProfileListData(profileAndReview.getFeedProfileResponses());

                isFeedReviewLastPage = !(profileAndReview.getFeedReviewResponses().size() == AppConstants.FeedReviewConstants.PAGE_SIZE);
                isFeedProfileLastPage = !(profileAndReview.getFeedProfileResponses().size() == AppConstants.FeedProfileConstants.PAGE_SIZE);

            }, Throwable::printStackTrace));
}

Дайте нам больше кода для расследования. Также вы уверены, что сетевой вызов никогда не принимается? Может быть, что-то пошло не так с частью DiffUtils.

Geoffrey Marizy 14.03.2018 11:02

сетевой вызов получен, но поскольку ответ диска получен сравнительно быстро, поэтому ZIP op принимает ответ диска. и сетевой вызов остается как есть. DiffUtil работает хорошо, тестировал отдельно.

Peter 14.03.2018 11:30
0
2
201
1

Ответы 1

getProfileObservable() и getReviewObservable() включают в себя два тернарных оператора, включенных в оператор return, это сложно и вообще не читается. Вы уверены, что Observable.concat(local, network) вызывается во время ваших тестов?

Также несколько замечаний:

  • Вы должны использовать combineLatest вместо zip() в zipProfileAndReview. Чтобы все значения обрабатывались zip(), у вас должно быть одинаковое количество значений для каждого из его источников. Если одно полученное значение является недействительным кеш-памятью или сбой сетевого вызова, будет использоваться только первое значение другого источника. Это может быть даже источником вашей проблемы.

  • Вам лучше использовать doOnNext() вместо карты для части //Cache Updates, поскольку вы не преобразуете (отображаете) значение, а просто выполняете побочные эффекты.

  • Вам не нужно точно указывать планировщики на всем пути. Просто запишите один раз .subscribeOn(schedulerProvider.io()).observeOn(AndroidSchedulers.mainThread()) в zipProfileAndReview().

Приносим извинения за сложный код и благодарим вас за обратную связь. Я внес изменения в общие замечания. И я использовал Concat op. отдельно, и он работает нормально. когда я добавляю ZIP op. ZIP op, обработайте последнее из reviewObservable и profileObservable (либо я буду кешем, либо сетевыми данными). Я хочу сохранить zip живым, чтобы он также мог получать сетевые данные Observable.concat (локальные, сетевые).

Peter 14.03.2018 12:49

я подумываю использовать Concat op. только и забудьте про ZIP op. Спасибо за поддержку.

Peter 14.03.2018 12:51

Попробуйте заменить zip () на combLatest (), как было объяснено для начала. Затем, если проблема все еще существует, разделите оператор возврата getProfileObservable () на несколько небольших функций и проверьте их одну за другой.

Geoffrey Marizy 14.03.2018 13:04

Другие вопросы по теме