Привет, профессионалы RxJS!
У меня есть поток, который обрабатывается несколько раз в секунду. Теперь мне нужен наблюдатель, который срабатывает, если он первый в своем роде ИЛИ определенное время прошло после последнего срабатывания. Я хочу добиться этого с помощью чистого RxJS без дополнительных «вспомогательных переменных».
Сценарий:
const list = ["foo", "foo", "bar", "foo",
"foo", "foo", "foo", "foo",
"foo", "foo", "bar", "foo"];
// in real world obs$ drops "foo" and "bar" randomly infinite times
const obs$ = timer(0, 100).pipe(take(12));
$obs
.pipe(map((v, i)=>list[i]+"#"+i))
.subscribe(console.info);
Ниже приведены все значения, полученные наблюдаемой. Я хочу поймать зеленые ( ✅) и игнорировать красные (❌).
// ⬇
"foo#1" // ✅ first of a kind
"foo#2" // ❌
"bar#3" // ✅ first of a kind
"foo#4" // ✅ first of a kind
"foo#5" // ❌
"foo#6" // ❌
"foo#7" // ❌
"foo#8" // ❌
"foo#9" // ✅ <-- I want this one too, because a certain time (0.5 seconds) went by
"foo#10" // ❌
"bar#11" // ✅ first of a kind
"foo#12" // ✅ first of a kind
Итак, я хочу этот вывод:
1#foo
3#bar
4#foo
9#foo
11#bar
12#foo
Как?
Вы можете использовать оператор сканирования для вычисления «состояния» на основе предыдущего состояния и входящего излучения.
В этом случае наше «состояние» будет объектом со всей информацией, необходимой для определения того, следует ли испускать данное излучение или нет. Мы можем предоставить начальное состояние, которое функция сканирования будет использовать при обработке первой эмиссии. После этого он будет использовать возвращаемое нами значение в качестве состояния.
Вот что мы можем использовать в качестве начального состояния:
{
startTime: Date.now(),
previousKind: '',
shouldEmit: true,
item: '',
}
startTime
используется для определения того, прошло ли достаточно времени для принудительного испускания.previousKind
— отслеживать тип ранее выпущенного предмета, чтобы мы могли определить, отличается ли текущий предмет от предыдущего.shouldEmit
— логическое значение, указывающее, следует ли создавать этот элемент.item
— это просто излучаемый элемент.Эта информация будет использоваться внутри нашего оператора сканирования ниже для создания нового состояния, и это та же самая форма, которая будет генерироваться оператором сканирования:
obs$.pipe(
scan(/* generates state - details revealed later */),
filter(state => state.shouldEmit),
map(state => state.item)
).subscribe(
item => console.info(`✅ ${item}`)
);
Как видите, мы применяем операцию сканирования, затем просто filter
извлекаем элементы, которые не помечены как shouldEmit
, а затем используем map
для создания исходного элемента.
Вот содержимое оператора scan
.
scan((state, item) => {
const kind = item.split('#')[0];
const isFirstOfKind = kind !== state.previousKind;
const durationExceeded = Date.now() - state.startTime > DURATION;
const shouldEmit = isFirstOfKind || durationExceeded;
return {
startTime: shouldEmit ? Date.now() : state.startTime,
previousKind: kind,
shouldEmit,
item,
}
}, {
startTime: Date.now(),
previousKind: '',
shouldEmit: true,
item: '',
})
Вы можете видеть, что мы передаем scan
функцию, которая получает предыдущее состояние и текущую эмиссию («элемент»). С помощью этой информации мы возвращаем новое состояние, которое будет передано нижестоящим операторам. Это состояние также доступно в следующий раз, когда scan
получит выброс.
Это решение со сканированием намного чище моего. Я удалил свой пост. хорошая работа.
Возможно это решение подойдет для дополнительного требования. Идея состоит в том, чтобы создать наблюдаемую для каждого элемента, которая будет выдавать два значения: одно сразу, а затем одно после прохождения DURATION. Если новая эмиссия происходит до DURATION, то второе значение никогда не будет эмитироваться. Я также добавил index
как уникальное свойство для каждого элемента, чтобы мы могли использовать distinct
для фильтрации уже созданных элементов. У меня нет времени тестировать или очищать, но это может сработать или, по крайней мере, указать вам полезное направление. :-)
Я нашел другое решение, которое не такое точное, как ваше, но более компактное и работает для моего реального жизненного сценария (stackoverflow.com/a/78719103/5708639). Еще раз спасибо, что задумались над этим!
После того, как БиззиБоб дал правильный ответ, я нашел что-то более «родное». Он не выполняет данное требование на 100%, но вполне подходит для моего сценария реальной жизни. Поэтому я хочу поделиться этим.
obs$
.pipe(
groupBy((x) => x.split('#')[0]),
mergeMap((group) =>
group.pipe(
throttleTime(500, undefined, { leading: false, trailing: true })
)
)
)
.subscribe((item) => console.info(`✅ ${item}`));
throttleTime()
всегда outputs
определенное значение через некоторое время (500 мс), если хотя бы одно значение появилось input
за этот промежуток времени. Поскольку я сгруппировал значения с помощью groupBy
, throttleTime()
может рассчитывать на то, что все входные значения одинаковы.
Вот результат:
✅ bar#3
✅ foo#5
✅ foo#10
✅ bar#11
✅ foo#12
Как уже упоминалось, это не соответствует требованию, но тем не менее работает как debounceTime
ПЛЮС «принудительное переключение каждые x секунд». Это было то, что мне было нужно.
Отлично работает! Головоломка решена. Однако для меня это имеет недостаток из-за небольшого поворота, о котором я просто забыл, выполняя «упражнение». Это: мне всегда нужно последнее значение. (Как в дебаунс). Итак, по сути: когда я добавляю дополнительный «foo» в конец массива, мне тоже нужен этот последний «foo». Я думаю, что ваш подход не может решить эту проблему, потому что он принимает решение на основе последнего значения. Проблему можно увидеть здесь: stackblitz.com/edit/…