Как лучше всего смоделировать опросчик с тайм-аутом, когда определенное условие вызывает ранний выход в виде «реактивных потоков»?
например
Если бы у меня был наблюдаемый, который каждую секунду производил убывающую последовательность положительных целых чисел
9,8,7,6,5,4,3,2,1,0
Как лучше всего написать потребителя, который принимает последнее отдельное событие через 5 секунд ИЛИ событие «0», если оно возникло раньше, чем тайм-аут.
Это мой код в его нынешнем виде: (Пример на Java)
int intialValue = 10;
AtomicInteger counter = new AtomicInteger(intialValue);
Integer val = Observable.interval(1, TimeUnit.SECONDS)
.map(tick -> counter.decrementAndGet())
.takeUntil(it -> it == 0)
.takeUntil(Observable.timer(5, TimeUnit.SECONDS))
.lastElement()
.blockingGet();
System.out.println(val);
если initialValue = 10, я ожидаю, что будет напечатано 6. если initialValue = 2, я ожидаю, что 0 будет напечатано до истечения 5-секундного тайм-аута.
Мне интересно, есть ли лучший способ сделать это.


![Подсказка RxJS [filter, skipWhile]](https://i.imgur.com/QlvZflrb.jpeg)

Не думаю, что есть лучший способ, чем то, что вы сделали. Вам необходимо иметь следующее:
interval)scan)takeWhile)takeUntil(timer(...)))last)Каждый представлен оператором. Вы не можете ничего сделать, чтобы обойти это. Я использовал несколько разных операторов (scan для агрегирования и takeWhile для завершения по значению), но это то же количество операторов.
const { interval, timer } = rxjs;
const { scan, takeWhile, takeUntil, last, tap } = rxjs.operators;
function poll(start) {
console.info('start', start);
interval(1000).pipe(
scan((x) => x - 1, start),
takeWhile((x) => x >= 0),
takeUntil(timer(5000)),
tap((x) => { console.info('tap', x); }),
last()
).subscribe(
(x) => { console.info('next', x); },
(e) => { console.info('error', e); },
() => { console.info('complete'); }
);
}
poll(10);
setTimeout(() => { poll(2); }, 6000);<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.min.js"></script>Я не понимаю, как вы ожидаете, что он будет работать на границах. В вашем примере вы всегда уменьшаете перед испусканием, поэтому, если ваше начальное значение равно 10, вы испускаете 9, 8, 7, 6 (4 значения). Если вы хотите начать с 10, тогда. вы могли бы сделать scan(..., start + 1), но это закончило бы вас на 7, потому что таймер в takeUntil(...) выровнен с исходным интервалом, так что 6 будет исключено. Если вы хотите выдать 5 значений, вы можете сделать takeUntil(timer(5001)). Кроме того, если вы не хотите ждать секунду для выдачи первого значения, вы можете поместить startWith(start) сразу после scan(...). Или вы можете сделать timer(0, 1000) с scan(..., start + 1) вместо исходного интервала.
Также обратите внимание, что завершение по значению (takeWhile) не будет завершено, пока не будет создано недопустимое значение (-1). Таким образом, он будет продолжаться в течение секунды после получения значения завершения (0). Кажется, что большинство операторов завершения работают таким образом, что если они завершаются на каком-то значении, они не пропускают другие.
Вы можете использовать take(5) вместо takeUntil(timer(5000)), потому что вы знаете, что он срабатывает с совпадающим интервалом, если это работает для вашего сценария. Это также позволило бы обойти проблему исключения последнего значения из-за выравнивания таймеров.
В наблюдаемой цепочке есть три элемента: источник (
interval().map()), выбор по значению (it == 0), выбор по времени (timer()). Компактнее этого не получишь.