У меня есть очень большой список упорядоченных наблюдаемых, которые нужно запускать параллельно. Когда каждая наблюдаемая возвращается, они добавляют свой результат к субъекту поведения, и именно так результаты передаются. Однако мне нужна конкретная функция, которая будет вызываться, когда все они будут завершены.
Каждая наблюдаемая загружает изображение (и связанные с ним метаданные) из API. Запросы должны выполняться как можно быстрее, и мне нужно работать с каждым результатом по мере его испускания и выдавать пустое значение, когда все наблюдаемые будут завершены. Это означает, что наблюдаемые должны выполняться параллельно.
Исходная реализация без обратного вызова по завершении.
const requests: Observable[] = getRequests();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
Чтобы реализовать обратный вызов после завершения всех запросов, я попробовал следующее.
const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
forkJoin(requests).subscribe(() => {
finishedTracker.next();
finishedTracker.complete();
console.info('requests done');
});
Это работает, но мне кажется странным, что мне нужно разделить forkJoin и подписки на запросы. Есть ли лучший способ реализовать эту функцию? Я также посмотрел на mergeMap, но не смог заставить его работать.
Редактировать Основываясь на комментариях, я понял, что двойная подписка означает повторные запросы. Поэтому я попытался выполнить другую реализацию.
from(requests).pipe(
mergeMap(o => {
o.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}
return o;
}, 10)
).subscribe(() => {
finishedTracker.next();
console.info('requests done');
})
Я не использовал результат от forkJoin
, потому что, насколько я понимаю, выдает мне результат всех запросов. Поэтому нужно дождаться их завершения. Поскольку каждый запрос выполняется относительно быстро, но часто их сотни, мне нужно, чтобы их отдельные результаты передавались субъекту поведения, как только каждый запрос завершается.
Изменить 2 Решение, с которым я пошел.
from(requests).pipe(
mergeMap(request => request, 10),
scan<ImageResponse, ImageResponse[]>((all, current, index) => {
all = all.concat(current);
this.$images.next(all);
return all;
}, [])
).subscribe({
complete: () => {
finishedTracker.next();
console.info('requests done');
}});
@MatthieuRiegler Вы правы. Я добавил правку, объясняющую, почему я не использовал результат forkJoin. Короче говоря, я хочу получить результаты запросов как можно скорее, чтобы они «просачивались» к субъекту поведения по мере их завершения.
После вашего редактирования вы подписываетесь даже больше, чем раньше. Возможно, вы ищете что-то вроде combLatest, как предлагает @MatthieuRiegler.
Вот расширение, которое принимает аккумулятор с массивом запросов и ответов на данный момент.
const { of, expand, takeWhile, map, delay } = rxjs;
const requests = Array.from({ length: 10 }, (_, i) => of(`Response ${i + 1}`).pipe(delay(Math.random() * 500)));
const responses$ = of({ requests, responses: [] })
.pipe(
expand((acc) =>
acc.requests[0].pipe(
map((response) => {
const [, ...remainingRequests] = acc.requests;
return { requests: remainingRequests, responses: [...acc.responses, response] };
})
)
),
takeWhile((acc) => acc.requests.length, true),
map((acc) => acc.responses)
);
responses$.subscribe((responses) => {
console.info(responses);
});
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity = "sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg= = " crossorigin = "anonymous" referrerpolicy = "no-referrer"></script>
Но тогда я передам результаты только после того, как они все закончат. Каждый запрос занимает менее 3-400 мс, но в массиве могут быть сотни запросов. Мне нужно, чтобы результаты были переданы субъекту поведения как можно скорее.
Затем используйте concatMap
concatMap тоже серийный, согласно документации rxjs. Полный список запросов займет более 5 секунд, и я не хочу, чтобы пользователь так долго ждал заполнения пользовательского интерфейса. Мне нужно, чтобы отдельные результаты «просачивались» в тему поведения, чтобы заполнить пользовательский интерфейс более быстрым способом.
Я отредактировал ответ, чтобы использовать расширение, которое до сих пор повторяется с накопителем запросов и ответов.
Как насчет простого комбайна?
import {of, Observable, combLatest, startWith} из 'rxjs';
declare const getRequests: () => Observable<any>[];
const requests: Observable<any>[] = getRequests();
combineLatest(requests.map(r => r.pipe(startWith(null)))).subscribe(result => {
if (!result.some(r => r === null)) {
// everything loaded
} else {
// not finished
}
})
Это несколько проблематично, поскольку каждая наблюдаемая будет испускаться только один раз, а в списке сотни наблюдаемых. Однако это может сработать, мне просто нужно будет найти ненулевое значение в массиве результатов в качестве дополнительного шага.
Хорошо, если я правильно вас понял, вам просто нужно отслеживать несколько асинхронных http-запросов. Вы хотели бы «отслеживать» каждый отдельно и работать с ним, как только он завершится для каждого из них, а также вам нужно отслеживать, выполняются ли все запросы, и поэтому вам нужно forJoin все запросы. В этом случае я думаю, что ваш подход forEach + forkJoin хорош, хотя, как вы сказали в редактировании, двойная подписка на http-запрос будет дважды вызывать сервер, что является нежелательным поведением. Я бы просто создал еще один Observable array
, единственной целью которого является отслеживание завершения всех событий.
const requests: Observable[] = getRequests();
const finishTracker: Observable[] = [];
requests.forEach(obs => {
const innerTracker = new Subject();
finishTracker.push(innerTracker.asObservable());
obs.subscribe(res => {
innerTracker.next();
innerTracker.complete();
// Do whatever you want with your image responce
doSomethingWithRes(res);
});
});
forkJoin(finishTracker).subscribe(res => console.info('all finished'));
ПРЕДУПРЕЖДЕНИЕ! Поскольку подписка вызывается в цикле forEach
, из-за асинхронной природы http req, я думаю, что это может привести к нежелательному поведению forkJoin
срабатывания или вообще не срабатыванию из-за innerTracker.complete()
срабатывания быстрее, чем forEach завершает заполнение массива. В этом случае вам нужно будет создать отдельный массив объектов перед вызовом подписки.
const requests: Observable[] = getRequests();
const additionalArray: any[] = [];
const finishTracker: Observable[] = [];
requests.forEach(obs => {
const innerTracker = new Subject();
finishTracker.push(innerTracker.asObservable());
additionalArray.push({myObs: obs, tracker: innerTracker});
});
forkJoin(finishTracker).subscribe(res => console.info('all finished'));
additionalArray.forEach(myObj => {
myObj.myObs.subscribe(
res => {
myObj.tracker.next();
myObj.tracker.complete();
// Do whatever you want with your image responce
doSomethingWithRes(res);
},
err => {
doSomethingWithErr(err)
}
)
});
Вы можете переместить этот процесс заполнения в свою функцию getRequests()
. Возможно, это не самое элегантное решение, но оно работает.
Это хорошая идея. Однако мне нужно обновлять другой компонент каждый раз, когда запрос завершается и когда все они выполняются, но ваш ответ можно относительно легко изменить, чтобы решить мою проблему.
Не обязательно подписываться внутри вашего mergeMap
. На самом деле, как указывали другие, это вызывает двойную подписку, поскольку mergeMap
внутренне подписывается на наблюдаемое, возвращаемое функцией, которую вы ему передаете.
Чтобы обрабатывать ответы по мере их появления, вы можете просто использовать канал и добавить в него свою логику обработки. Поскольку вы, по сути, выполняете побочный эффект (что-то, что не изменяет вывод текущего потока), уместно использовать оператор tap:
from(requests).pipe(
mergeMap(o => o.pipe(
tap(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}),
}, 10)
).subscribe(() => {
finishedTracker.next();
console.info('requests done');
})
Хотя это будет работать, похоже, вы слишком усложняете наблюдаемый поток. Я не совсем уверен в вашем случае использования, но я предполагаю, что субъекты вообще не нужны. Если ваша цель — выдавать кумулятивный массив результатов по мере их обработки, вы можете использовать для этого сканирование без привлечения каких-либо Subject
или BehaviorSubject
. Чтобы выполнить некоторую логику, когда все запросы выполнены, вы можете передать частичный Observer, который указывает только обратный вызов complete
(вместо обратного вызова next
, который неявно используется, когда вы передаете функцию в качестве аргумента subscribe()
):
from(requests).pipe(
mergeMap(request => request, 10),
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.info('requests done')
});
Обновлено:
Как указал @AdrianBrand, более лаконично использовать слияние вместо from
/mergeMap
:
merge(...requests, 10).pipe(
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.info('requests done')
})
Tap решает мою проблему, но в вашем упрощенном потоке каждый ответ не передается. Проблема в том, что у меня есть сотни, а для некоторых пользователей тысячи запросов. Я хочу передать частичный список ответов или только последние ответы компоненту пользовательского интерфейса, пока обрабатывается весь список.
О, да. Вероятно, нужно назначить наблюдаемое и подписаться на него так же, как вы подписывались на images$
ранее. Обновлю, когда вернусь за компьютер.
использование сканирования на самом деле очень аккуратно решает мою проблему. Мне по-прежнему нужно передавать ответы другому компоненту через отдельную тему поведения, но это намного чище.
Порядок важен? Это не будет выдавать значения в том же порядке, в котором были сделаны запросы. Вы также можете просто использовать слияние. stackblitz.com/edit/…
О, ты прав @AdrianBrand! merge
более лаконичен, чем использование from
с mergeMap
.
экспортировать объявить функцию слияния<T>(v1: ObservableInput<T>, concurrent?: число, планировщик?: SchedulerLike): Observable<T>; из 14 перегрузок этот выглядит как тот, который вам нужен.
При использовании слияния я получаю наблюдаемое вместо ответа при сканировании. В моем случае я хочу обрабатывать ответы, поэтому должен ли я подписываться на них при сканировании? Используя from
, я могу обрабатывать ответы и подписываться на оператор from
, чтобы обрабатывать отдельные запросы и сообщать об общем завершении.
О, элементы массива нужно передавать в слияние по одному, а не как массив. Моя ошибка, я обновлю, чтобы включить оператор распространения.
Вы подписываетесь дважды здесь, это не может быть хорошо. Почему бы вам просто не использовать результат, возвращаемый forkJoin?