Я пытаюсь запустить наблюдаемое несколько раз, пока не будет испущено какое-то другое наблюдаемое. Я не хочу останавливать незавершенное на данный момент наблюдаемое, я лишь хочу, чтобы оно больше не повторялось.
тс/rxjs
Я попробовал использовать takeUntil, но это закрывает поток $, но он фактически не завершается (он обрезает его до таймера 1000 мс).
const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeat(), takeUntil(timer(1500)));
Если мы согласны с тем, что поток $ выполняется в течение 1000 мс перед закрытием, я ожидаю, что в этом случае он должен выполняться 2 раза по 1000 мс. Мне нужен какой-то оператор, который перестанет повторять наблюдаемое при каком-то событии$.
Что-то вроде этого:
const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeatUntil(timer(1500)));
Мы можем использовать takeWhile
и дождаться выброса нулевого элемента, а затем вызвать условие взятия.
Это сгенерирует выходные данные, в которых мы, похоже, получим излучение даже после установленного порогового времени.
const { rxObserver } = require('api/v0.3');
const { timer, interval, Subject } = require('rxjs');
const { take, map, repeat, takeUntil, takeWhile, toArray} = require('rxjs/operators');
const msg = 'awesome';
let turnOff = true;
interval(33).pipe(
takeUntil(timer(100)),
repeat(),
takeWhile((data) => data === 0 ? turnOff : true),
).subscribe(rxObserver());
setTimeout(() => {
turnOff = false;
}, 134)
Попробуйте переключить значение setTimeout между 134
и 133
.
@DarkoMitic обновил мой ответ!
Это интересный подход, и он делает то, что я хочу. В моем случае я бы использовал timer(0, 33)
вместо interval(33)
, чтобы не задерживать нулевую эмиссию, а также суммировать последнюю эмиссию с нулевой эмиссией из следующего цикла. Прежде чем вы предложили мне это решение, я использовал комбинацию повтораКогда и takeUntil с фильтром: repeatWhen(() => trigger$.pipe(filter((repeat) => repeat))), takeUntil(trigger$.pipe(filter((repeat) => !repeat)))
, но ваш ответ гораздо более элегантен и не требует эмиссии триггера$ в конце каждого цикла. Большое спасибо!
На самом деле это работает не так, как я хочу. Мне нужно повторить один и тот же поток $, от его начала до завершения, и продолжать повторять его до тех пор, пока не произойдет какое-либо внешнее событие $.
interval(33).pipe(takeUntil(timer(100))).pipe(repeat()).subscribe(rxObserver());
Это будет повторять 012, 012, 012, 012... шарики. Когда я хочу остановить это в какой-то момент, я хочу, чтобы он завершил текущий повторяющийся наблюдаемый поток, чтобы он оказался на шарике 2, а не разрезал его на шарике 0 или 1.