У меня возникла проблема, пытаясь понять реактивным способом, как должны работать одновременные операции с одним и тем же наблюдаемым объектом.
Сценарий следующий:
У меня есть список пользователей и кнопка удаления.
Каждый раз, когда я нажимаю remove, я вызываю API: UsersApi.removeUser. Возможно одновременное удаление нескольких пользователей. Это означает, что одновременно происходит несколько UsersApi.removeUser.
После каждого UsersApi.removeUser мне нужно сделать звонок UsersApi.refreshUser
Итак, с точки зрения псевдокода, когда я нажимаю кнопку удаления, я делаю следующее:
Ведущий:
public Observable<User> removeUser(int userId) {
return UsersApi.removeUser(userId)
.flatMap(user -> UsersApi.refreshUser(userId));
}
Фрагмент:
public void removeUser() {
presenter.removeUser(userId)
.subscribe(user -> {
//remove user from ui
// update number of total users
})
}
Проблема с этим подходом заключается в том, что из-за асинхронного характера удаления (разрешено несколько удалений) я не могу гарантировать, что то, что достигает подписчика, является самым последним. Подписка будет достигнута дважды, по одной для каждого удаления, и информация о пользователе может быть не обновленной или последней. Имеет ли это смысл?
Чего я хочу:
Обновлено: я хотел бы знать, как сделать / если возможно сделать решение, которое я сделал (см. Edit2), с использованием операторов Rx.
Edit2: Мое решение для этого заключалось в том, чтобы поставить в очередь пользовательские операции (в данном случае remove) и выдать, используя PublishSubject, когда завершится вызов UsersApi.refreshUser(userId).
Итак, в основном то, что я сделал (псевдокод):
private final PublishSubject<UserOperation> userOperationObs;
private final ConcurrentLinkedQueue<UserOperation> pendingOperations;
private boolean executingOperation;
private void emitUserOperation(final UserOperation operation) {
if (!executingOperation) {
executingOperation = true;
userOperationObs.onNext(operation);
} else {
executingOperation.add(operation);
}
}
public Observable<User> removeUser(UserOperation operation) {
return UsersApi.removeUser(operation.getUserId)
.switchMap(user -> UsersApi.refreshUser(operation.getUserId))
.doOnNext(user -> {
executingOperation = false;
final UserOperation nextOperation = pendingOperations.poll();
if (nextOperation != null) {
userOperationObs.onNext(operation);
}
};
}
Каждое действие удаления является независимым и может происходить в разные моменты времени.
Непонятно чего ты хочешь. Вы описали проблему, но не то, что хотите.
Извинения. Надеюсь, теперь я прояснил это.
concatMapEager делает то, что вы просите. Он будет обрабатываться в восходящем направлении параллельно, но конкатенация приводит к порядку. В этой статье это хорошо объясняется: nurkiewicz.com/2017/08/…
ConcatMapEager обеспечивает параллелизм, но не параллелизм, цитируя вашу статью: параллельный, по порядку, но несколько дорого
Используйте параметр maxConcurrent функции flatMap с вашим объектом userOperationObs
Вы можете превратить щелчок в пользовательском интерфейсе в Observable (например, используя RxBinding). После этого вы можете использовать оператор concatMap для выполнения вызова api, чтобы он запускал следующий сетевой вызов после завершения текущего вызова api.
// emit clicks as stream
Observable<?> clicks = RxView.clicks(removeView)
// listen clicks then perform network call in sequence
clicks.concatMap(ignored -> usersApi.refreshUser(userId))
Проблема в том, что каждое удаление предназначено для другого пользователя, что означает, что каждое удаление запускает новый поток. В вашем случае каждый RxView.clicks запускает новый obs и сразу же выдает (потому что это другой поток)
используйте оператор zip Rx, чтобы определить, когда все операции завершены