Как я могу заставить rxjs observable излучать в определенные даты?

У меня есть Наблюдаемый RxJS, который необходимо пересчитывать в определенные моменты времени, как описано массивом объектов DateTime (хотя для целей этого вопроса они могут быть объектами JavaScript Date, миллисекундами эпохи или чем-либо еще, представляющим конкретный момент времени):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

Я изо всех сил пытаюсь понять, как создать Observable, который будет излучать в моменты времени, указанные в таком массиве.

Вот о чем я подумал, пытаясь ответить на свой вопрос:

  • Мне почти наверняка нужно использовать оператор delay, где указанная задержка - это время между «сейчас» и следующим будущим datetime.
  • Мне как-то нужно убедиться, что «сейчас» актуально во время подписки, а не во время создания Observable - возможно, с помощью оператор defer - хотя я не хочу без необходимости создавать несколько экземпляров Observable, если есть несколько подписок.
  • Я не уверен, как выполнять итерацию по массиву с течением времени - мне может понадобиться оператор expand, но он вызывает что-то рекурсивно, и я просто пытаюсь перебирать список.
  • Оператор timer кажется неуместным, поскольку продолжительность между датами разная.
  • Я мог бы сопоставить каждое datetime с его собственным отложенным Observable и вернуть их все через merge, но это становится ужасно неэффективным, поскольку количество datetime в массиве увеличивается (их могут быть сотни), так что это абсолютное последнее средство.

Как я могу сделать наблюдаемый RxJS, который принимает список дат и времени, а затем излучает по мере достижения каждого из них по времени, завершая последний?

8
0
2 102
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Возможно, вы захотите основать свое решение на:

Я ценю предложения, но ни одно из них не предлагает решения на основе RxJS, и мне не нужна дополнительная сложность управления жизненным циклом setTimeout() в рабочем пространстве, где уже доминирует RxJS.

Alex Peters 31.10.2018 12:42

Вы можете адаптировать концепции setTimeout() к Планировщик RxJS.

Haroldo_OK 31.10.2018 12:46
«Планировщик контролирует начало подписки ...» - Я не думаю, что это то, что я ищу в данном случае.
Alex Peters 31.10.2018 12:49

Я думаю, что то, что вы резюмировали по пунктам списка, правильно. Использование delay кажется очевидным, но это затруднит понимание цепочки.

Решение, которое приходит мне в голову, предполагает, что вы знаете массив changeTimes до создания наблюдаемой цепочки. Вы можете создать свой собственный «наблюдаемый метод создания», который будет возвращать Observable, который излучает, например, на основе setTimeout (это просто «псевдокод», он не вычисляет дату должным образом):

const schedule = (dates: Date[]): Observable<Date> => new Observable(observer => {
  // Sort the `dates` array from the earliest to the latest...

  let index = 0;
  let clearTimeout;

  const loop = () => {
    const now = new Date();
    const delay = dates[index] - now;

    clearTimeout = setTimeout(() => {
      observer.next(dates[index++]);

      if (index < dates.length) {
        loop();
      }
    }, delay);
  }

  loop();

  return () => clearTimeout(clearTimeout);
}); 

...

schedule(changeTimes)
  .subscribe(...)

Последний вариант, который вы упомянули с merge, на самом деле не так уж и плох. Я понимаю, что вы обеспокоены тем, что он создаст много подписок, но если вы отсортируете массив changeTimes, а затем используете concat вместо merge, он всегда сохранит только одну активную подписку, даже если вы создадите 100 наблюдаемых.

Спасибо. Мне особенно нравится ваше предложение использовать concat вместо merge, что дает мне идею альтернативного решения.

Alex Peters 31.10.2018 14:26

Вот рабочий пример:

import {Injectable} from '@angular/core';
import {Observable, Subject, timer} from 'rxjs';

@Injectable()
export class TimerService {

  futureDates: Date[] = [];
  futureDate: Date;
  notifier: Observable<string>;

  cycle = (observer) => {
    if (this.futureDates.length > 0) {
      this.futureDate = this.futureDates.shift();

      const msInFuture = this.futureDate.getTime() - Date.now();
      if (msInFuture < 0) {
        console.log(`date ${this.futureDate.toISOString()}
            expected to be in the future, but was ${msInFuture} msec in the past, so stopping`);

        observer.complete();
      } else {
        timer(msInFuture).subscribe(x => {
          observer.next(`triggered at ${new Date().toISOString()}`);
          this.cycle(observer);
        });
      }
    } else {
        observer. complete();
    }
  }

  getTimer(): Observable<string> {
    const now = new Date();
    const ms1 = now.getTime() + 10000;
    const ms2 = now.getTime() + 20000;
    this.futureDates.push(new Date(ms1));
    this.futureDates.push(new Date(ms2));

    this.notifier = new Observable(observer => {
      this.cycle(observer);
    });

    return this.notifier;
  }
}

В этом примере список будущих времен создается в методе getTimer(), но вы можете передать в этот метод массив дат.

Ключ состоит в том, чтобы просто сохранять даты, обрабатывать по одной и во время обработки проверять, насколько далеко в будущем эта дата, и устанавливать однократный таймер Rx на это количество миллисекунд.

Спасибо. Единственная проблема, которую я предвижу в этом случае, заключается в том, что в моем случае весь массив значений даты и времени регулярно изменяется (и фактически подается через сам Observable). При изменении этого массива необходимо отменить все ожидающие обработки timer.

Alex Peters 31.10.2018 15:08

Вы можете сохранить ссылку на подписку на ожидающий таймер. Затем вы можете вызвать функцию unsubscribe () для этой подписки при изменении массива.

GreyBeardedGeek 31.10.2018 16:30

Это правда, хотя поддержание Subscription с целью отказа от подписки и повторной подписки, когда основные изменения данных - это проблема, которую гораздо более четко обрабатывают операторы RxJS, такие как switchMap.

Alex Peters 01.11.2018 05:15
Ответ принят как подходящий

Учитывая массив объектов DateTime:

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

или еще лучше, Observable, который испускает массив из них каждый раз, когда массив изменяется (что на самом деле происходит в моем сценарии, хотя я не упомянул об этом в вопросе, потому что это не было строго актуальным):

const changeTimes$: Observable<DateTime[]> = /* ... */;

следующий Observable будет немедленно генерировать следующее время в будущем при подписке, испускать каждое последующее время в будущем по истечении предыдущего будущего времени, а затем завершать с null:

const nextTime$ = changeTimes$.pipe(
    // sort DateTimes chronologically
    map(unsorted => [...unsorted].sort((x, y) => +x - +y),
    // remove duplicates
    map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),
    // convert each time to a delayed Observable
    map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(
        // emit the first one immediately
        // emit all others at the previously emitted time
        delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())
    )))),
    // combine into a single Observable
    switchMap(observables => concat(...observables)),
);
  • Сортировка необходима, поскольку каждый внутренний Observable («подождите X миллисекунд, затем сообщите время») подписывается по завершении предыдущего.
  • Удаление дубликатов не обязательно, но кажется целесообразным.
  • defer используется, чтобы текущее время вычислялось при подписке на внутренний Observable.
  • concat используется для последовательного выполнения каждого внутреннего Observable (благодаря Мартин), избегая накладных расходов на подписку для каждого раза в списке одновременно.

То, как это удовлетворяет мою первоначальную потребность:

I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects...

если я объединю это с данными, требующими пересчета, используя оператор combineLatest, чтобы инициировать этот пересчет в правильное время:

const timeAwareData$ = combineLatest(timeUnawareData$, nextTime$).pipe(
    tap(() => console.log('either the data has changed or a time has been reached')),
    // ...
);

Что мне не нравится в моем решении

Он создает отдельный внутренний Observable одновременно для каждого времени в списке. Я чувствую, что можно провести рефакторинг так, чтобы каждый внутренний Observable создавался только после того, как предыдущий был уничтожен. Любые советы по улучшению будут с благодарностью получены.

Другие вопросы по теме