Rxjs: как заставить цикл foreach ждать внутреннего наблюдаемого

У меня есть цикл foreach, в котором мне может понадобиться http-запрос для получения некоторой информации. Я попытался использовать forkjoin внутри цикла foreach, чтобы «заставить цикл foreach ждать наблюдаемых» (в этом примере есть только один наблюдаемый, но на самом деле у меня будет больше...). НО с этим кодом каждый foreach продолжает работать, не дожидаясь завершения наблюдаемых, и я не могу найти никакого решения, чтобы предотвратить это.

Так что ваша помощь очень ценится!!!

    ...
    let wishes: Wish[] = [];
    response.wishes.forEach((wish) => {
          let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
                   
          let personObservables: Observable<any>[] = [];
          if (wish.reservation){
            personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
          } else {
            personObservables.push(of(null));
          }
          forkJoin(personObservables).subscribe( ([reservedBy]) => {
            if (reservedBy) {
              newWish.reservation = {
                ...wish.reservation,
                reservedBy
              };
            }
            wishes.push(newWish);
          } );
        });
    ...

Обновлено: Полнофункциональное рабочее решение без цикла foreach. Гораздо проще использовать оператор карты в канале и функцию карты в массиве. Я узнал, что проще разделить такую ​​логику на несколько операторов, чем пытаться исправить все это в одном операторе карты...

    public getWishlist ( receiver : Person) : Observable<Wish[]> {
        return this.http$.get<IWishlistResponse[]>(
          environment.apiUrl + 'wishlist/' + receiver.id
        ).pipe(
          
          // Create wish instances from each wish in API response and save reservation for later use
          map( wishlistResponse => 
            wishlistResponse[0].wishes.map(wish => ({
              wish: this.createWishInstanceFromResponse(wish, receiver),
              reservation: wish.reservation
            }))),
          
          // For each wish with reservation: get person info for 'reservedBy' id
          map( wishesAndReservationObjects => wishesAndReservationObjects.map( ({wish, reservation}) => 
            !reservation ? 
            of(wish) : 
            this.peopleService.getPersonById(reservation.reservedBy)
            .pipe(
              map ( reservedBy => {
                if (reservedBy) wish.reservation = { 
                  ...reservation, 
                  reservedBy: new Person(reservedBy.id, reservedBy.firstName, reservedBy.lastName)
                }
                return wish;
              })
            )
           )),
          
          // forkJoin all observables, so the result is an array of all the wishes
          switchMap(reservedByObservables => reservedByObservables.length !== 0 ? forkJoin(reservedByObservables) : of(<Wish[]>[])), //https://stackoverflow.com/questions/41723541/rxjs-switchmap-not-emitting-value-if-the-input-observable-is-empty-array
          
          // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
          map ( wishes => wishes.map( wish => {
            wish.setUserIsFlags(this.userService.currentUser);
            return wish;
          })),
          
          // For each wish: get state via API call
          map ( wishesWithoutState => wishesWithoutState.map( wishWithoutState => 
            this.http$.get<wishStatus>(environment.apiUrl + 'wish/' + wishWithoutState.id + '/state')
            .pipe(
              catchError(() => of(null)),
              map( state => {
                wishWithoutState.status = state;
                return wishWithoutState;
              })
            )
          )),
          
          // Combine all stateObservables into 1 array
          switchMap(stateObservables => stateObservables.length !== 0 ? forkJoin(stateObservables) : of(<Wish[]>[]))
        )
      }
    
      private createWishInstanceFromResponse ( wishResponse : IWishResponse, receiver: Person ) : Wish {
        let wish : Wish = new Wish (
          wishResponse._id,
          wishResponse.title,
          wishResponse.price,
          ...
        );
        return wish;
      }

Может быть, вообще удалить forkjoin из цикла? Итак, сначала создайте массив, а затем запустите форк?

MikeOne 23.12.2020 00:48
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Angular и React для вашего проекта веб-разработки?
Angular и React для вашего проекта веб-разработки?
Когда дело доходит до веб-разработки, выбор правильного front-end фреймворка имеет решающее значение. Angular и React - два самых популярных...
Эпизод 23/17: Twitter Space о будущем Angular, Tiny Conf
Эпизод 23/17: Twitter Space о будущем Angular, Tiny Conf
Мы провели Twitter Space, обсудив несколько проблем, связанных с последними дополнениями в Angular. Также прошла Angular Tiny Conf с 25 докладами.
Угловой продивер
Угловой продивер
Оригинал этой статьи на турецком языке. ChatGPT используется только для перевода на английский язык.
Мое недавнее углубление в Angular
Мое недавнее углубление в Angular
Недавно я провел некоторое время, изучая фреймворк Angular, и я хотел поделиться своим опытом со всеми вами. Как человек, который любит глубоко...
Освоение Observables и Subjects в Rxjs:
Освоение Observables и Subjects в Rxjs:
Давайте начнем с основ и постепенно перейдем к более продвинутым концепциям в RxJS в Angular
1
1
4 078
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Для этого вы используете from и concatMap.

let wishes: Wish[] = [];
from(response.wishes).pipe(concatMap(wish) => {
      let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
               
      let personObservables: Observable<any>[] = [];
      if (wish.reservation){
        personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
      } else {
        personObservables.push(of(null));
      }
      return forkJoin(personObservables).pipe(tap([reservedBy]) => {
        if (reservedBy) {
          newWish.reservation = {
            ...wish.reservation,
            reservedBy
          };
        }
        wishes.push(newWish);
      }));
    }).subscribe();

Спасибо за ответ, @FanCheung. Я пытался реализовать это, но не могу понять, как заставить работать мой полный код... Этот код является частью канала другого наблюдаемого: this.http$.get(...).map (ответ => код выше).switchMap(пожелания => ...). Если я реализую вышеизложенное, пожелания в switchMap имеют тип void. Так что я ничего не возвращаю из функции карты, кажется... Полный код добавлен в мой вопрос...

bits 23.12.2020 10:55

Нужно знать контекст кода. похоже, вы возвращаете всю операцию из функции. В таком случае подписки быть не должно.

Fan Cheung 23.12.2020 14:27

Да я знаю, пробовал без подписки, но потом столкнулся с новыми проблемами. Мне удалось передать желания через всю трубу, но я не смог передать их ВСЕ через трубу. При подписке в моем компоненте несколько желаний «пришли», а другие нет. Я предполагаю, что это как-то связано со всеми асинхронными вызовами, которые я выполняю.

bits 23.12.2020 21:58

вы можете создать эту функцию для обработки вашего процесса:

completeData(items): Observable<any> {
    let $apiCallItems: Observable<any>[] = [];
    const itemsNeedServiceCall: Array<any> = [];
    const itemsCompleted: Array<any> = [];
    items.forEach((item) => {
      if (item.reservation) {
        $apiCallItems.push(this.peopleService.getPersonById(item.reservation.reservedBy));
        itemsNeedServiceCall.push(item);
      } else {
        itemsCompleted.push(item);
      }
    });
    return forkJoin($apiCallItems).pipe(
      map((r) => {
        for (let i = 0; i < r.length; i++) {
          itemsNeedServiceCall[i] = {
            ...itemsNeedServiceCall[i],
            ...r[i],
          };
        }
        return [...itemsCompleted, ...itemsNeedServiceCall];
      })
    );
  }

а затем в любом месте, которое вы хотите использовать, вы можете сделать следующее:

this.completeData(response.wishes).subscribe(r => {
    //some code
})
Ответ принят как подходящий

Как правило, никогда не вызывайте подписку на наблюдаемое, если только вы не являетесь конечным потребителем и не готовы выбросить наблюдаемое (больше никаких преобразований в данные не требуется). Обычно это происходит только при отображении пользователю или записи в базу данных.

Что делает ваш код (что возвращают ваши различные вызовы), мне не очень понятно. Итак, это приблизительное представление о том, как я буду делать то, что вы пытаетесь сделать.

Что-то в этом духе должно работать.

Заметили, что я никогда не подписываюсь, пока не буду готов записать окончательный список пожеланий в консоль? Так задумано, поэтому информация соответствует ожидаемому.

return this.http$.get<IWishlistResponse[]>(
  environment.apiUrl + 
  'wishlist/' + 
  receiver.id
).pipe(
  map(wishlist => wishlist.wishes.map(wish => ({
    oldWish: wish,
    newWish: new Wish( wish._id, wish.title, wish.price)
  }))),
  map(wishes => wishes.map(({oldWish, newWish}) => 
    !oldWish.reservation ? 
    of(newWish) :
    this.peopleService.getPersonById(oldWish.reservation.reservedBy).pipe(
      map(([reservedBy]) => reservedBy ?
        {
          ...newWish, 
          reservation: {
            ...oldWish.reservation,
            reservedBy
          }
        } :
        {
          ...newWish,
          reservation: oldWish.reservation
        }
      )
    )
  )),
  switchMap(newWishObservables => forkJoin(newWishObservables))
).subscribe(newWishes => console.info(newWishes));

Обновление №1: Очистка кода

Некоторые вещи, которые упрощают обслуживание кода, проверку ошибок и т. д., заключаются в том, чтобы передать создание наблюдаемых объектов в отдельные функции и сохранить конвейеры «чистыми (er)» от логики преобразования.

Как только вы это сделаете, этот шаблон:

map(things => things.map(thing => observableThing)),
mergeMap(observableThings => forkJoin(observableThings))

можно комбинировать, не создавая лишнего беспорядка

mergeMap(things => forkJoin(
  things.map(thing => observableThing)
))

Вот как я могу навести порядок в вашем обновленном коде (это делается быстро на лету, не тестировалось).

Вы заметите, что я избавился от вашего кода для обработки пустых forkJoins. Я не думаю, что это действительно проблема, вы можете решить проблему позже в конвейере по мере завершения наблюдаемого.

Вы также заметите, что я быстро обработал ошибки в обеих функциях, которые создают наблюдаемые объекты. Оба раза я просто заменял значение по умолчанию (хотя и в разных частях трубы с немного другим эффектом). На практике вы, вероятно, захотите проверить имя или тип ошибки и отреагировать соответствующим образом.

Я просто игнорирую ошибки, но некоторые ошибки нельзя игнорировать спокойно. Это зависит от вас.

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response and save reservation for later use
    map( wishlistResponse => 
      wishlistResponse[0].wishes.map(wish => ({
        wish: this.createWishInstanceFromResponse(wish, receiver),
        reservation: wish.reservation
      }))
    ),
    
    // For each wish with reservation: get person info for 'reservedBy' id
    map(wishesAndReservationObjects => 
      wishesAndReservationObjects.map(({wish, reservation}) => 
        this.addReservationToWish(wish, reservation)
      )
    ),
    
    // forkJoin all observables, so the result is an array of all the wishes
    switchMap(reservedByObservables => 
      forkJoin(reservedByObservables)
    ),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    map(wishesWithoutState => wishesWithoutState.map(wishWithoutState => 
      this.addStatusToWish(wishWithoutState)
    )),
    
    // Combine all stateObservables into 1 array
    switchMap(stateObservables =>
      forkJoin(stateObservables)
    ),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}

private createWishInstanceFromResponse( 
  wishResponse : IWishResponse, 
  receiver: Person 
): Wish {
  let wish : Wish = new Wish (
    wishResponse._id,
    wishResponse.title,
    wishResponse.price
  );
  return wish;
}

/***
 * Create an observable that tries to add reservation and 
 * reservedBy to the wish
 ***/ 
private addReservationToWish(wish: Wish, reservation): Observable<Wish>{
  return this.peopleService.getPersonById(reservation?.reservedBy).pipe(
    // if peopleService.getPersonById fails (Say, because reservation was 
    // null), convert the error into a null response, the filter will Ignore
    // the failed call and just return wish.
    catchError(_ => of(null)),
    // Since errors become null, this filters errors. Furthermore if peopleService
    // doesn't error and just emits a null/undefined value instead, this will
    // filter that situation as well
    filter(reservedBy => reservedBy != null),
    // Now we know reservedBy isn't null, so map doesn't need to check
    map(reservedBy => { 
      wish.reservation = reservation;
      wish.reservation.reservedBy = new Person(
        reservedBy.id, 
        reservedBy.firstName, 
        reservedBy.lastName,
        ...
      );
      return wish;
    }),
    // If reservedBy was null (due to error or otherwise), then just emit
    // the wish without a reservation
    defaultIfEmpty(wish)
  );
}

/***
 * Create an observable that tries to add status to the wish
 ***/ 
private addStatusToWish(wish: Wish): Observable<Wish>{
  return this.http$.get<WishStatus>(
    environment.apiUrl + 
    'wish/' + 
    wishWithoutState.id + 
    '/state'
  ).pipe(
    map(state => ({
      ...wish,
      status: state
    })),
    // If http$.get<WishStatus> failed, then just return the wish
    // without a status
    catchError(_ => of(wish)),
  );
}

Теперь, когда переходы от map к forkJoin(mapped) выглядят достаточно чистыми, я бы, наверное, объединил их, но это на ваше усмотрение.

Вот как это будет выглядеть:

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response
    // For each wish with reservation: get person info for 'reservedBy' id
    switchMap(wishlistResponse => forkJoin(
      wishlistResponse[0].wishes.map(wish => 
        this.addReservationToWish(
          this.createWishInstanceFromResponse(wish, receiver), 
          wish.reservation
        )
      )
    )),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    switchMap(wishesWithoutState => forkJoin(
      wishesWithoutState.map(wishWithoutState => this.addStatusToWish(wishWithoutState))
    )),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}

Я улучшил свой код на основе вашего ответа, и теперь он работает отлично. Я также смог очистить свой код в соответствии с вашим стилем кода. Так что это был отличный опыт обучения! Я добавил свое полное решение в свой пост выше. Не могли бы вы проверить это и дать несколько советов и рекомендаций для более чистого кода?

bits 24.12.2020 02:18

Не могли бы вы просто сказать мне, где в этом канале рекомендуется добавлять обработку ошибок?

bits 24.12.2020 14:11

Я сделал обновление, обработка ошибок настолько специфична для конкретного случая, что там, где вы добавляете, трудно дать эмпирические правила. В общем, мне нравится выполнять некоторую обработку ошибок для распространенных ошибок, которые может возвращать каждый API, а затем выполнять некоторую общую обработку неизвестных/неожиданных ошибок позже.

Mrk Sef 24.12.2020 18:07

Вау, большое спасибо за обновление! Я получил так много наблюдаемых знаний благодаря вам. Очень ценю ваши усилия и время, чтобы объяснить это так ясно (и переписать мое решение в более чистый код :-D). Пожалуйста, следуйте за мной, чтобы вы могли ответить на все мои вопросы ;-)

bits 25.12.2020 01:48

Другие вопросы по теме