Наблюдаемая RxJS, которая срабатывает, если она первая в своем роде ИЛИ прошло определенное время

Привет, профессионалы RxJS!

У меня есть поток, который обрабатывается несколько раз в секунду. Теперь мне нужен наблюдатель, который срабатывает, если он первый в своем роде ИЛИ определенное время прошло после последнего срабатывания. Я хочу добиться этого с помощью чистого RxJS без дополнительных «вспомогательных переменных».

Сценарий:

const list = ["foo", "foo", "bar", "foo",
              "foo", "foo", "foo", "foo",
              "foo", "foo", "bar", "foo"];

// in real world obs$ drops "foo" and "bar" randomly infinite times
const obs$ = timer(0, 100).pipe(take(12)); 

$obs
  .pipe(map((v, i)=>list[i]+"#"+i))
  .subscribe(console.info); 

Ниже приведены все значения, полученные наблюдаемой. Я хочу поймать зеленые ( ✅) и игнорировать красные (❌).

// ⬇
"foo#1"  // ✅ first of a kind
"foo#2"  // ❌
"bar#3"  // ✅ first of a kind
"foo#4"  // ✅ first of a kind
"foo#5"  // ❌
"foo#6"  // ❌
"foo#7"  // ❌
"foo#8"  // ❌
"foo#9"  // ✅ <-- I want this one too, because a certain time (0.5 seconds) went by
"foo#10" // ❌
"bar#11" // ✅ first of a kind
"foo#12" // ✅ first of a kind

Итак, я хочу этот вывод:

1#foo
3#bar
4#foo
9#foo
11#bar
12#foo

Как?

Зод: сила проверки и преобразования данных
Зод: сила проверки и преобразования данных
Сегодня я хочу познакомить вас с библиотекой Zod и раскрыть некоторые ее особенности, например, возможности валидации и трансформации данных, а также...
Как заставить Remix работать с Mantine и Cloudflare Pages/Workers
Как заставить Remix работать с Mantine и Cloudflare Pages/Workers
Мне нравится библиотека Mantine Component , но заставить ее работать без проблем с Remix бывает непросто.
Угловой продивер
Угловой продивер
Оригинал этой статьи на турецком языке. ChatGPT используется только для перевода на английский язык.
TypeScript против JavaScript
TypeScript против JavaScript
TypeScript vs JavaScript - в чем различия и какой из них выбрать?
Синхронизация localStorage в масштабах всего приложения с помощью пользовательского реактивного хука useLocalStorage
Синхронизация localStorage в масштабах всего приложения с помощью пользовательского реактивного хука useLocalStorage
Не все нужно хранить на стороне сервера. Иногда все, что вам нужно, это постоянное хранилище на стороне клиента для хранения уникальных для клиента...
Что такое ленивая загрузка в Angular и как ее применять
Что такое ленивая загрузка в Angular и как ее применять
Ленивая загрузка - это техника, используемая в Angular для повышения производительности приложения путем загрузки модулей только тогда, когда они...
3
0
117
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы можете использовать оператор сканирования для вычисления «состояния» на основе предыдущего состояния и входящего излучения.

В этом случае наше «состояние» будет объектом со всей информацией, необходимой для определения того, следует ли испускать данное излучение или нет. Мы можем предоставить начальное состояние, которое функция сканирования будет использовать при обработке первой эмиссии. После этого он будет использовать возвращаемое нами значение в качестве состояния.

Вот что мы можем использовать в качестве начального состояния:

{
  startTime: Date.now(),
  previousKind: '',
  shouldEmit: true,
  item: '',
}
  • startTime используется для определения того, прошло ли достаточно времени для принудительного испускания.
  • previousKind — отслеживать тип ранее выпущенного предмета, чтобы мы могли определить, отличается ли текущий предмет от предыдущего.
  • shouldEmit — логическое значение, указывающее, следует ли создавать этот элемент.
  • item — это просто излучаемый элемент.

Эта информация будет использоваться внутри нашего оператора сканирования ниже для создания нового состояния, и это та же самая форма, которая будет генерироваться оператором сканирования:

obs$.pipe(
  scan(/* generates state - details revealed later */),
  filter(state => state.shouldEmit),
  map(state => state.item)
).subscribe(
  item => console.info(`✅ ${item}`)
); 

Как видите, мы применяем операцию сканирования, затем просто filter извлекаем элементы, которые не помечены как shouldEmit, а затем используем map для создания исходного элемента.


Вот содержимое оператора scan.

  scan((state, item) => {
    const kind = item.split('#')[0];
    const isFirstOfKind = kind !== state.previousKind;
    const durationExceeded = Date.now() - state.startTime > DURATION;
    const shouldEmit = isFirstOfKind || durationExceeded;

    return {
      startTime: shouldEmit ? Date.now() : state.startTime,
      previousKind: kind,
      shouldEmit,
      item,
    }
  }, {
    startTime: Date.now(),
    previousKind: '',
    shouldEmit: true,
    item: '',
  })

Вы можете видеть, что мы передаем scan функцию, которая получает предыдущее состояние и текущую эмиссию («элемент»). С помощью этой информации мы возвращаем новое состояние, которое будет передано нижестоящим операторам. Это состояние также доступно в следующий раз, когда scan получит выброс.


Вот демо-версия StackBlitz.

Отлично работает! Головоломка решена. Однако для меня это имеет недостаток из-за небольшого поворота, о котором я просто забыл, выполняя «упражнение». Это: мне всегда нужно последнее значение. (Как в дебаунс). Итак, по сути: когда я добавляю дополнительный «foo» в конец массива, мне тоже нужен этот последний «foo». Я думаю, что ваш подход не может решить эту проблему, потому что он принимает решение на основе последнего значения. Проблему можно увидеть здесь: stackblitz.com/edit/…

jaheraho 07.07.2024 08:42

Это решение со сканированием намного чище моего. Я удалил свой пост. хорошая работа.

decius 07.07.2024 09:26

Возможно это решение подойдет для дополнительного требования. Идея состоит в том, чтобы создать наблюдаемую для каждого элемента, которая будет выдавать два значения: одно сразу, а затем одно после прохождения DURATION. Если новая эмиссия происходит до DURATION, то второе значение никогда не будет эмитироваться. Я также добавил index как уникальное свойство для каждого элемента, чтобы мы могли использовать distinct для фильтрации уже созданных элементов. У меня нет времени тестировать или очищать, но это может сработать или, по крайней мере, указать вам полезное направление. :-)

BizzyBob 07.07.2024 21:01

Я нашел другое решение, которое не такое точное, как ваше, но более компактное и работает для моего реального жизненного сценария (stackoverflow.com/a/78719103/5708639). Еще раз спасибо, что задумались над этим!

jaheraho 08.07.2024 06:26

После того, как БиззиБоб дал правильный ответ, я нашел что-то более «родное». Он не выполняет данное требование на 100%, но вполне подходит для моего сценария реальной жизни. Поэтому я хочу поделиться этим.

obs$
  .pipe(
    groupBy((x) => x.split('#')[0]),
    mergeMap((group) =>
      group.pipe(
        throttleTime(500, undefined, { leading: false, trailing: true })
      )
    )
  )
  .subscribe((item) => console.info(`✅ ${item}`));

throttleTime() всегда outputs определенное значение через некоторое время (500 мс), если хотя бы одно значение появилось input за этот промежуток времени. Поскольку я сгруппировал значения с помощью groupBy, throttleTime() может рассчитывать на то, что все входные значения одинаковы.

Вот результат:

✅ bar#3
✅ foo#5
✅ foo#10
✅ bar#11
✅ foo#12

Как уже упоминалось, это не соответствует требованию, но тем не менее работает как debounceTime ПЛЮС «принудительное переключение каждые x секунд». Это было то, что мне было нужно.

Stackblitz

Другие вопросы по теме