Как правильно разветвить список наблюдаемых или их подписок?

У меня есть очень большой список упорядоченных наблюдаемых, которые нужно запускать параллельно. Когда каждая наблюдаемая возвращается, они добавляют свой результат к субъекту поведения, и именно так результаты передаются. Однако мне нужна конкретная функция, которая будет вызываться, когда все они будут завершены.

Каждая наблюдаемая загружает изображение (и связанные с ним метаданные) из API. Запросы должны выполняться как можно быстрее, и мне нужно работать с каждым результатом по мере его испускания и выдавать пустое значение, когда все наблюдаемые будут завершены. Это означает, что наблюдаемые должны выполняться параллельно.

Исходная реализация без обратного вызова по завершении.

const requests: Observable[] = getRequests();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

Чтобы реализовать обратный вызов после завершения всех запросов, я попробовал следующее.

const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

forkJoin(requests).subscribe(() => {
    finishedTracker.next();
    finishedTracker.complete();
    console.info('requests done');
});

Это работает, но мне кажется странным, что мне нужно разделить forkJoin и подписки на запросы. Есть ли лучший способ реализовать эту функцию? Я также посмотрел на mergeMap, но не смог заставить его работать.

Редактировать Основываясь на комментариях, я понял, что двойная подписка означает повторные запросы. Поэтому я попытался выполнить другую реализацию.

from(requests).pipe(
    mergeMap(o => {
        o.subscribe(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }
        return o;
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.info('requests done');
})

Я не использовал результат от forkJoin, потому что, насколько я понимаю, выдает мне результат всех запросов. Поэтому нужно дождаться их завершения. Поскольку каждый запрос выполняется относительно быстро, но часто их сотни, мне нужно, чтобы их отдельные результаты передавались субъекту поведения, как только каждый запрос завершается.

Изменить 2 Решение, с которым я пошел.

from(requests).pipe(
    mergeMap(request => request, 10),
    scan<ImageResponse, ImageResponse[]>((all, current, index) => {
        all = all.concat(current);
        this.$images.next(all);
        return all;
    }, [])
).subscribe({
    complete: () => {
    finishedTracker.next();
    console.info('requests done');
}});

Вы подписываетесь дважды здесь, это не может быть хорошо. Почему бы вам просто не использовать результат, возвращаемый forkJoin?

Matthieu Riegler 13.02.2023 10:13

@MatthieuRiegler Вы правы. Я добавил правку, объясняющую, почему я не использовал результат forkJoin. Короче говоря, я хочу получить результаты запросов как можно скорее, чтобы они «просачивались» к субъекту поведения по мере их завершения.

tgm 13.02.2023 11:11

После вашего редактирования вы подписываетесь даже больше, чем раньше. Возможно, вы ищете что-то вроде combLatest, как предлагает @MatthieuRiegler.

Chund 13.02.2023 12:11
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Angular и React для вашего проекта веб-разработки?
Angular и React для вашего проекта веб-разработки?
Когда дело доходит до веб-разработки, выбор правильного front-end фреймворка имеет решающее значение. Angular и React - два самых популярных...
Эпизод 23/17: Twitter Space о будущем Angular, Tiny Conf
Эпизод 23/17: Twitter Space о будущем Angular, Tiny Conf
Мы провели Twitter Space, обсудив несколько проблем, связанных с последними дополнениями в Angular. Также прошла Angular Tiny Conf с 25 докладами.
Угловой продивер
Угловой продивер
Оригинал этой статьи на турецком языке. ChatGPT используется только для перевода на английский язык.
Мое недавнее углубление в Angular
Мое недавнее углубление в Angular
Недавно я провел некоторое время, изучая фреймворк Angular, и я хотел поделиться своим опытом со всеми вами. Как человек, который любит глубоко...
Освоение Observables и Subjects в Rxjs:
Освоение Observables и Subjects в Rxjs:
Давайте начнем с основ и постепенно перейдем к более продвинутым концепциям в RxJS в Angular
2
3
118
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Вот расширение, которое принимает аккумулятор с массивом запросов и ответов на данный момент.

const { of, expand, takeWhile, map, delay } = rxjs;

const requests = Array.from({ length: 10 }, (_, i) => of(`Response ${i + 1}`).pipe(delay(Math.random() * 500)));

const responses$ = of({ requests, responses: [] })
  .pipe(
    expand((acc) =>
      acc.requests[0].pipe(
        map((response) => {
          const [, ...remainingRequests] = acc.requests;
          return { requests: remainingRequests, responses: [...acc.responses, response] };
        })
      )
    ),
    takeWhile((acc) => acc.requests.length, true),
    map((acc) => acc.responses)
  );
  
  
responses$.subscribe((responses) => {
  console.info(responses);
});
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity = "sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg= = " crossorigin = "anonymous" referrerpolicy = "no-referrer"></script>

Но тогда я передам результаты только после того, как они все закончат. Каждый запрос занимает менее 3-400 мс, но в массиве могут быть сотни запросов. Мне нужно, чтобы результаты были переданы субъекту поведения как можно скорее.

tgm 13.02.2023 11:04

Затем используйте concatMap

Adrian Brand 13.02.2023 11:18

concatMap тоже серийный, согласно документации rxjs. Полный список запросов займет более 5 секунд, и я не хочу, чтобы пользователь так долго ждал заполнения пользовательского интерфейса. Мне нужно, чтобы отдельные результаты «просачивались» в тему поведения, чтобы заполнить пользовательский интерфейс более быстрым способом.

tgm 13.02.2023 11:23

Я отредактировал ответ, чтобы использовать расширение, которое до сих пор повторяется с накопителем запросов и ответов.

Adrian Brand 13.02.2023 11:58

Как насчет простого комбайна?

import {of, Observable, combLatest, startWith} из 'rxjs';

declare const getRequests: () => Observable<any>[];

const requests: Observable<any>[] = getRequests();


combineLatest(requests.map(r => r.pipe(startWith(null)))).subscribe(result => {

  if (!result.some(r => r === null)) {
    // everything loaded
  } else {
    // not finished
  }
})

Детская площадка

Это несколько проблематично, поскольку каждая наблюдаемая будет испускаться только один раз, а в списке сотни наблюдаемых. Однако это может сработать, мне просто нужно будет найти ненулевое значение в массиве результатов в качестве дополнительного шага.

tgm 13.02.2023 11:43

Хорошо, если я правильно вас понял, вам просто нужно отслеживать несколько асинхронных http-запросов. Вы хотели бы «отслеживать» каждый отдельно и работать с ним, как только он завершится для каждого из них, а также вам нужно отслеживать, выполняются ли все запросы, и поэтому вам нужно forJoin все запросы. В этом случае я думаю, что ваш подход forEach + forkJoin хорош, хотя, как вы сказали в редактировании, двойная подписка на http-запрос будет дважды вызывать сервер, что является нежелательным поведением. Я бы просто создал еще один Observable array, единственной целью которого является отслеживание завершения всех событий.

const requests: Observable[] = getRequests();
const finishTracker: Observable[] = [];

requests.forEach(obs => {
    const innerTracker = new Subject();
    finishTracker.push(innerTracker.asObservable());

    obs.subscribe(res => {
        innerTracker.next();
        innerTracker.complete();

        // Do whatever you want with your image responce
        doSomethingWithRes(res);
    });
});

forkJoin(finishTracker).subscribe(res => console.info('all finished'));

ПРЕДУПРЕЖДЕНИЕ! Поскольку подписка вызывается в цикле forEach, из-за асинхронной природы http req, я думаю, что это может привести к нежелательному поведению forkJoin срабатывания или вообще не срабатыванию из-за innerTracker.complete() срабатывания быстрее, чем forEach завершает заполнение массива. В этом случае вам нужно будет создать отдельный массив объектов перед вызовом подписки.

const requests: Observable[] = getRequests();
const additionalArray: any[] = [];
const finishTracker: Observable[] = [];

requests.forEach(obs => {
    const innerTracker = new Subject();
    finishTracker.push(innerTracker.asObservable());
    additionalArray.push({myObs: obs, tracker: innerTracker});
});

forkJoin(finishTracker).subscribe(res => console.info('all finished'));

additionalArray.forEach(myObj => {
    myObj.myObs.subscribe(
        res => {
            myObj.tracker.next();
            myObj.tracker.complete();

            // Do whatever you want with your image responce
            doSomethingWithRes(res);
        },
        err => {
            doSomethingWithErr(err)
        }
    )
});

Вы можете переместить этот процесс заполнения в свою функцию getRequests(). Возможно, это не самое элегантное решение, но оно работает.

Это хорошая идея. Однако мне нужно обновлять другой компонент каждый раз, когда запрос завершается и когда все они выполняются, но ваш ответ можно относительно легко изменить, чтобы решить мою проблему.

tgm 13.02.2023 13:11
Ответ принят как подходящий

Не обязательно подписываться внутри вашего mergeMap. На самом деле, как указывали другие, это вызывает двойную подписку, поскольку mergeMap внутренне подписывается на наблюдаемое, возвращаемое функцией, которую вы ему передаете.

Чтобы обрабатывать ответы по мере их появления, вы можете просто использовать канал и добавить в него свою логику обработки. Поскольку вы, по сути, выполняете побочный эффект (что-то, что не изменяет вывод текущего потока), уместно использовать оператор tap:

from(requests).pipe(
    mergeMap(o => o.pipe(
        tap(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }),
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.info('requests done');
})

Хотя это будет работать, похоже, вы слишком усложняете наблюдаемый поток. Я не совсем уверен в вашем случае использования, но я предполагаю, что субъекты вообще не нужны. Если ваша цель — выдавать кумулятивный массив результатов по мере их обработки, вы можете использовать для этого сканирование без привлечения каких-либо Subject или BehaviorSubject. Чтобы выполнить некоторую логику, когда все запросы выполнены, вы можете передать частичный Observer, который указывает только обратный вызов complete (вместо обратного вызова next, который неявно используется, когда вы передаете функцию в качестве аргумента subscribe()):

from(requests).pipe(
    mergeMap(request => request, 10),
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.info('requests done')
});

Обновлено: Как указал @AdrianBrand, более лаконично использовать слияние вместо from/mergeMap:

merge(...requests, 10).pipe(
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.info('requests done')
})

Tap решает мою проблему, но в вашем упрощенном потоке каждый ответ не передается. Проблема в том, что у меня есть сотни, а для некоторых пользователей тысячи запросов. Я хочу передать частичный список ответов или только последние ответы компоненту пользовательского интерфейса, пока обрабатывается весь список.

tgm 13.02.2023 13:07

О, да. Вероятно, нужно назначить наблюдаемое и подписаться на него так же, как вы подписывались на images$ ранее. Обновлю, когда вернусь за компьютер.

BizzyBob 13.02.2023 13:19

использование сканирования на самом деле очень аккуратно решает мою проблему. Мне по-прежнему нужно передавать ответы другому компоненту через отдельную тему поведения, но это намного чище.

tgm 13.02.2023 13:22

Порядок важен? Это не будет выдавать значения в том же порядке, в котором были сделаны запросы. Вы также можете просто использовать слияние. stackblitz.com/edit/…

Adrian Brand 13.02.2023 13:56

О, ты прав @AdrianBrand! merge более лаконичен, чем использование from с mergeMap.

BizzyBob 13.02.2023 14:07

экспортировать объявить функцию слияния<T>(v1: ObservableInput<T>, concurrent?: число, планировщик?: SchedulerLike): Observable<T>; из 14 перегрузок этот выглядит как тот, который вам нужен.

Adrian Brand 13.02.2023 14:18

При использовании слияния я получаю наблюдаемое вместо ответа при сканировании. В моем случае я хочу обрабатывать ответы, поэтому должен ли я подписываться на них при сканировании? Используя from, я могу обрабатывать ответы и подписываться на оператор from, чтобы обрабатывать отдельные запросы и сообщать об общем завершении.

tgm 14.02.2023 13:18

О, элементы массива нужно передавать в слияние по одному, а не как массив. Моя ошибка, я обновлю, чтобы включить оператор распространения.

BizzyBob 14.02.2023 13:42

Другие вопросы по теме