RxJava одновременные операции удаления

У меня возникла проблема, пытаясь понять реактивным способом, как должны работать одновременные операции с одним и тем же наблюдаемым объектом.

Сценарий следующий:

У меня есть список пользователей и кнопка удаления. Каждый раз, когда я нажимаю remove, я вызываю API: UsersApi.removeUser. Возможно одновременное удаление нескольких пользователей. Это означает, что одновременно происходит несколько UsersApi.removeUser.

После каждого UsersApi.removeUser мне нужно сделать звонок UsersApi.refreshUser

Итак, с точки зрения псевдокода, когда я нажимаю кнопку удаления, я делаю следующее:

Ведущий:

public Observable<User> removeUser(int userId) {
        return UsersApi.removeUser(userId)
                .flatMap(user -> UsersApi.refreshUser(userId));
    }

Фрагмент:

public void removeUser() {
   presenter.removeUser(userId)
     .subscribe(user -> {
        //remove user from ui
        // update number of total users
   })
}

Проблема с этим подходом заключается в том, что из-за асинхронного характера удаления (разрешено несколько удалений) я не могу гарантировать, что то, что достигает подписчика, является самым последним. Подписка будет достигнута дважды, по одной для каждого удаления, и информация о пользователе может быть не обновленной или последней. Имеет ли это смысл?

Чего я хочу:

  1. Параллельные / одновременные вызовы удаления с использованием реактивного подхода (запускаются несколькими щелчками удаления у пользователя)
  2. После завершения вызова удаления начните следующий вызов удаления

Обновлено: я хотел бы знать, как сделать / если возможно сделать решение, которое я сделал (см. Edit2), с использованием операторов Rx.

Edit2: Мое решение для этого заключалось в том, чтобы поставить в очередь пользовательские операции (в данном случае remove) и выдать, используя PublishSubject, когда завершится вызов UsersApi.refreshUser(userId).

Итак, в основном то, что я сделал (псевдокод):

private final PublishSubject<UserOperation> userOperationObs;
private final ConcurrentLinkedQueue<UserOperation> pendingOperations;
private boolean executingOperation;

private void emitUserOperation(final UserOperation operation) {
      if (!executingOperation) {
            executingOperation = true;
            userOperationObs.onNext(operation);
        } else {
            executingOperation.add(operation);
        }
    }

public Observable<User> removeUser(UserOperation operation) {
        return UsersApi.removeUser(operation.getUserId)
                .switchMap(user -> UsersApi.refreshUser(operation.getUserId))
                .doOnNext(user -> {
                   executingOperation = false;
                   final UserOperation nextOperation = pendingOperations.poll();
                   if (nextOperation != null) {
                       userOperationObs.onNext(operation);
                   }
};
    }

используйте оператор zip Rx, чтобы определить, когда все операции завершены

Debu 04.05.2018 15:50

Каждое действие удаления является независимым и может происходить в разные моменты времени.

Peddro 04.05.2018 15:52

Непонятно чего ты хочешь. Вы описали проблему, но не то, что хотите.

Bob Dalgleish 04.05.2018 22:38

Извинения. Надеюсь, теперь я прояснил это.

Peddro 07.05.2018 11:41

concatMapEager делает то, что вы просите. Он будет обрабатываться в восходящем направлении параллельно, но конкатенация приводит к порядку. В этой статье это хорошо объясняется: nurkiewicz.com/2017/08/…

Gustav Karlsson 27.06.2018 11:18

ConcatMapEager обеспечивает параллелизм, но не параллелизм, цитируя вашу статью: параллельный, по порядку, но несколько дорого

Peddro 27.06.2018 15:38

Используйте параметр maxConcurrent функции flatMap с вашим объектом userOperationObs

Eugene Popovich 06.07.2018 07:17
5
7
223
1

Ответы 1

Вы можете превратить щелчок в пользовательском интерфейсе в Observable (например, используя RxBinding). После этого вы можете использовать оператор concatMap для выполнения вызова api, чтобы он запускал следующий сетевой вызов после завершения текущего вызова api.

// emit clicks as stream
Observable<?> clicks = RxView.clicks(removeView)

// listen clicks then perform network call in sequence
clicks.concatMap(ignored -> usersApi.refreshUser(userId))

Проблема в том, что каждое удаление предназначено для другого пользователя, что означает, что каждое удаление запускает новый поток. В вашем случае каждый RxView.clicks запускает новый obs и сразу же выдает (потому что это другой поток)

Peddro 11.07.2018 12:40

Другие вопросы по теме