У меня есть исходный поток, который не завершается, но мне нужен оператор, который завершает наблюдателей после того, как переданное значение удовлетворяет некоторым требованиям, чтобы сделать его последним элементом для наблюдателей. На данный момент мой код просто использует takeUntil() для наблюдаемого объекта, который испускается, когда значение из источника соответствует критериям.
const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
map(val => meetsCriteriaToBeFinalValue(val)),
skipWhile(meetsCriteria => meetsCriteria === false),
shareReplay(), // so that this emits after the fact
take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
takeUntil(conditionSatisfied)
);
example.subscribe(
val => { // does stuff with individual values },
err => { // handles error },
() => { // this gets triggered one item too early }
);
Проблема с этим заключается в том, что когда элемент, который соответствует критериям, проходит (заставляя conditionSatisfied испускать), наблюдатель на example завершает работу до того, как «последнее значение» сможет передать его обратному вызову next. Есть ли способ передать окончательное значение наблюдателю до завершения потока?
Читая параметры, expand выглядит похоже на то, что мне нужно, позволяя значениям проходить перед рекурсивной отправкой новых наблюдаемых в поток. Однако, хотя он может добавлять значения в поток, не похоже, что он может завершить поток.
Обновлено: изменено название вопроса, чтобы быть более общим



![Безумие обратных вызовов в javascript [JS]](https://i.imgur.com/WsjO6zJb.png)


Вы можете заставить conditionSatisfied выдавать исходное значение вместо логического, а затем объединять его с наблюдаемым, которое завершается на основе условия:
const source = nonCompletingStream();
const conditionSatisfied = source.pipe(
filter(val => meetsCriteriaToBeFinalValue(val)),
shareReplay(),
take(1)
);
const example = concat(
source.pipe(
takeUntil(conditionSatisfied)
),
conditionSatisfied
)
Продемонстрировано здесь https://stackblitz.com/edit/rxjs-qydhh3
Изменив свой поток на горячий наблюдаемый, вы можете напрямую контролировать его жизненный цикл.
const source = nonCompletingStream().pipe(publish());
Запустите подписку, подключив исходный наблюдаемый объект:
const example = source.subscribe(...);
const subscription = source.connect();
И остановите подписку, отменив подписку на наблюдаемый источник:
source.pipe(filter(meetsCriteriaToBeFinalValue)).subscribe(() => subscription.unsubscribe());
Живой пример:
const { interval } = rxjs;
const { publish, filter } = rxjs.operators;
const source$ = interval(100).pipe(publish());
const stopCriteria = x => x === 3;
const example$ = source$.subscribe(console.info);
const subscription = source$.connect();
source$.pipe(
filter(stopCriteria)
).subscribe(() => subscription.unsubscribe());<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>