Я работаю над одним внутренним приложением, и у меня есть ситуация, когда я получаю события от пользователей. Для каждого события я буду звонить в какую-то конечную точку foo. Если у меня есть несколько событий от одного и того же пользователя, я хочу выполнить следующий вызов foo после завершения предыдущего вызова foo (поведение concatMap). В то же время для разных пользователей вызовы должны выполняться параллельно (поведение mergeMap).
В коде это выглядит примерно так:
const userEvent$: Observable<{userId: string}> = ...
const foo: (userId: string) => Observable<Response> = ...
let result$: Observable<Response> = ???



![Безумие обратных вызовов в javascript [JS]](https://i.imgur.com/WsjO6zJb.png)


Рассмотрите возможность использования оператора GroupBy для группировки событий по userId, затем примените concatMap к группе, чтобы события в одной группе выполнялись последовательно, как показано в примере ниже:
userEvent$.pipe(
groupBy(item => item.userId),
mergeMap(group => group.pipe(concatMap(x => foo(x.userId))))
);
Демо:
const { of, from } = rxjs;
const { delay, tap, concatMap, mergeMap, groupBy } = rxjs.operators;
function foo(userId, index) {
return of(userId).pipe(
delay(2000),
tap(() => console.info("UserId: ", userId, ", Occurrences: ", index + 1))
)
}
const userEvent$ = from([
{ userId: 1 }, // <- first
{ userId: 2 }, // <- first
{ userId: 2 }, // <- second
{ userId: 3 },// <- first
{ userId: 1 }, // <- second
{ userId: 1 }, // <- third
{ userId: 2 }, // <- third
{ userId: 3 }, // <- second
]);
userEvent$
.pipe(
groupBy(x => x.userId),
mergeMap(group => group.pipe(concatMap((x,i) => foo(x.userId,i)))),
)
.subscribe()
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>что заставляет вас думать так? попробуй userEvent$ = timer(0,1000).pipe(map(x => ({userId:x})))
Насколько мне известно, оператор
groupByожидает, чтоuserEvent$observable завершится до того, как он начнет испускать какие-либо элементы. Таким образом, это решение не будет работать в режиме реального времени (когдаuserEvent$не завершится).