Есть ли способ завершить поток после прохождения конечного значения?

У меня есть исходный поток, который не завершается, но мне нужен оператор, который завершает наблюдателей после того, как переданное значение удовлетворяет некоторым требованиям, чтобы сделать его последним элементом для наблюдателей. На данный момент мой код просто использует takeUntil() для наблюдаемого объекта, который испускается, когда значение из источника соответствует критериям.

const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
    map(val => meetsCriteriaToBeFinalValue(val)),
    skipWhile(meetsCriteria => meetsCriteria === false),
    shareReplay(),  // so that this emits after the fact
    take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
    takeUntil(conditionSatisfied)
);

example.subscribe(
    val => { // does stuff with individual values },
    err => { // handles error },
    () => { // this gets triggered one item too early }
);

Проблема с этим заключается в том, что когда элемент, который соответствует критериям, проходит (заставляя conditionSatisfied испускать), наблюдатель на example завершает работу до того, как «последнее значение» сможет передать его обратному вызову next. Есть ли способ передать окончательное значение наблюдателю до завершения потока?

Читая параметры, expand выглядит похоже на то, что мне нужно, позволяя значениям проходить перед рекурсивной отправкой новых наблюдаемых в поток. Однако, хотя он может добавлять значения в поток, не похоже, что он может завершить поток.

Обновлено: изменено название вопроса, чтобы быть более общим

Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
1
0
171
2

Ответы 2

Вы можете заставить conditionSatisfied выдавать исходное значение вместо логического, а затем объединять его с наблюдаемым, которое завершается на основе условия:

const source = nonCompletingStream();
const conditionSatisfied = source.pipe(
    filter(val => meetsCriteriaToBeFinalValue(val)),
    shareReplay(),
    take(1)
);
const example = concat(
  source.pipe(
      takeUntil(conditionSatisfied)
  ),
  conditionSatisfied
)

Продемонстрировано здесь https://stackblitz.com/edit/rxjs-qydhh3

Изменив свой поток на горячий наблюдаемый, вы можете напрямую контролировать его жизненный цикл.

const source = nonCompletingStream().pipe(publish());

Запустите подписку, подключив исходный наблюдаемый объект:

const example = source.subscribe(...);
const subscription = source.connect();

И остановите подписку, отменив подписку на наблюдаемый источник:

source.pipe(filter(meetsCriteriaToBeFinalValue)).subscribe(() => subscription.unsubscribe());

Живой пример:

const { interval } = rxjs;
const { publish, filter } = rxjs.operators;

const source$ = interval(100).pipe(publish());
const stopCriteria = x => x === 3;

const example$ = source$.subscribe(console.info);

const subscription = source$.connect();

source$.pipe(
  filter(stopCriteria)
).subscribe(() => subscription.unsubscribe());
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>

Другие вопросы по теме