Как лучше всего реализовать опросчик с таймаутом в качестве реактивного потока

Как лучше всего смоделировать опросчик с тайм-аутом, когда определенное условие вызывает ранний выход в виде «реактивных потоков»?

например

Если бы у меня был наблюдаемый, который каждую секунду производил убывающую последовательность положительных целых чисел

9,8,7,6,5,4,3,2,1,0

Как лучше всего написать потребителя, который принимает последнее отдельное событие через 5 секунд ИЛИ событие «0», если оно возникло раньше, чем тайм-аут.

Это мой код в его нынешнем виде: (Пример на Java)

    int intialValue = 10;

    AtomicInteger counter = new AtomicInteger(intialValue);
    Integer val = Observable.interval(1, TimeUnit.SECONDS)
                            .map(tick -> counter.decrementAndGet())
                            .takeUntil(it -> it == 0)
                            .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                            .lastElement()
                            .blockingGet();

    System.out.println(val);

если initialValue = 10, я ожидаю, что будет напечатано 6. если initialValue = 2, я ожидаю, что 0 будет напечатано до истечения 5-секундного тайм-аута.

Мне интересно, есть ли лучший способ сделать это.

В наблюдаемой цепочке есть три элемента: источник (interval().map()), выбор по значению (it == 0), выбор по времени (timer()). Компактнее этого не получишь.

Bob Dalgleish 08.05.2018 21:47
Освоение Observables и Subjects в Rxjs:
Освоение Observables и Subjects в Rxjs:
Давайте начнем с основ и постепенно перейдем к более продвинутым концепциям в RxJS в Angular
Promise v/s Observable в Angular
Promise v/s Observable в Angular
В системах Push производитель определяет, когда отправить данные потребителю. Потребитель не знает, когда он получит эти данные.
Подсказка RxJS [filter, skipWhile]
Подсказка RxJS [filter, skipWhile]
Эта подсказка описывает разницу между операторами filter и skipWhile из библиотеки RxJS .
В чем разница между Promise и Observable?
В чем разница между Promise и Observable?
Разберитесь в этом вопросе, и вы значительно повысите уровень своей компетенции.
0
1
749
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Не думаю, что есть лучший способ, чем то, что вы сделали. Вам необходимо иметь следующее:

  • Интервал для излучения (interval)
  • Агрегатор для уменьшения и сохранения последнего значения (scan)
  • Условие завершения значения (takeWhile)
  • Условие прерывания по времени (takeUntil(timer(...)))
  • Получить последнее значение по завершении (last)

Каждый представлен оператором. Вы не можете ничего сделать, чтобы обойти это. Я использовал несколько разных операторов (scan для агрегирования и takeWhile для завершения по значению), но это то же количество операторов.

const { interval, timer } = rxjs;
const { scan, takeWhile, takeUntil, last, tap } = rxjs.operators;

function poll(start) {
  console.info('start', start);
  interval(1000).pipe(
    scan((x) => x - 1, start),
    takeWhile((x) => x >= 0),
    takeUntil(timer(5000)),
    tap((x) => { console.info('tap', x); }),
    last()
  ).subscribe(
    (x) => { console.info('next', x); },
    (e) => { console.info('error', e); },
    () => { console.info('complete'); }
  );
}

poll(10);
setTimeout(() => { poll(2); }, 6000);
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.min.js"></script>

Я не понимаю, как вы ожидаете, что он будет работать на границах. В вашем примере вы всегда уменьшаете перед испусканием, поэтому, если ваше начальное значение равно 10, вы испускаете 9, 8, 7, 6 (4 значения). Если вы хотите начать с 10, тогда. вы могли бы сделать scan(..., start + 1), но это закончило бы вас на 7, потому что таймер в takeUntil(...) выровнен с исходным интервалом, так что 6 будет исключено. Если вы хотите выдать 5 значений, вы можете сделать takeUntil(timer(5001)). Кроме того, если вы не хотите ждать секунду для выдачи первого значения, вы можете поместить startWith(start) сразу после scan(...). Или вы можете сделать timer(0, 1000) с scan(..., start + 1) вместо исходного интервала.

Также обратите внимание, что завершение по значению (takeWhile) не будет завершено, пока не будет создано недопустимое значение (-1). Таким образом, он будет продолжаться в течение секунды после получения значения завершения (0). Кажется, что большинство операторов завершения работают таким образом, что если они завершаются на каком-то значении, они не пропускают другие.

Вы можете использовать take(5) вместо takeUntil(timer(5000)), потому что вы знаете, что он срабатывает с совпадающим интервалом, если это работает для вашего сценария. Это также позволило бы обойти проблему исключения последнего значения из-за выравнивания таймеров.

Другие вопросы по теме