У меня возникла проблема, когда я пытаюсь заказать несколько отдельных потоков из базы данных временных рядов. Предполагая, что все данные в каждом потоке отсортированы по метке времени, учитывая следующий код, как бы я изменил потоки dataA$
и dataB$
так, чтобы каждый из них выдавал значения в порядке значения с меткой времени БЕЗ ожидания завершения всего потока:
import { delayWhen, of, timer } from "rxjs";
const dataA = [{"data":"b","timestamp":6672},{"data":"c","timestamp":7404},{"data":"a","timestamp":7922},{"data":"b","timestamp":8885},{"data":"c","timestamp":9111},{"data":"a","timestamp":9245},{"data":"c","timestamp":10168},{"data":"b","timestamp":10778},{"data":"c","timestamp":11504},{"data":"a","timestamp":12398},{"data":"a","timestamp":12745},{"data":"a","timestamp":13648},{"data":"a","timestamp":14233},{"data":"a","timestamp":14943},{"data":"b","timestamp":15869},{"data":"c","timestamp":16043},{"data":"a","timestamp":16169},{"data":"a","timestamp":16242},{"data":"a","timestamp":17058},{"data":"b","timestamp":17885},{"data":"a","timestamp":18252},{"data":"a","timestamp":18711},{"data":"c","timestamp":18883},{"data":"b","timestamp":19618},{"data":"a","timestamp":20183}];
const dataB = [{"data":"b","timestamp":821},{"data":"b","timestamp":1357},{"data":"b","timestamp":2108},{"data":"b","timestamp":3001},{"data":"a","timestamp":3995},{"data":"b","timestamp":4475},{"data":"c","timestamp":5357},{"data":"c","timestamp":5373},{"data":"b","timestamp":6199},{"data":"c","timestamp":6207},{"data":"b","timestamp":6896},{"data":"b","timestamp":7410},{"data":"a","timestamp":8335},{"data":"a","timestamp":9191},{"data":"b","timestamp":10007},{"data":"b","timestamp":10703},{"data":"c","timestamp":11225},{"data":"c","timestamp":11453},{"data":"c","timestamp":12131},{"data":"c","timestamp":12599},{"data":"c","timestamp":13567},{"data":"a","timestamp":13726},{"data":"b","timestamp":14161},{"data":"b","timestamp":14224},{"data":"b","timestamp":14666}];
const dataA$ = of(dataA).pipe(
delayWhen(() => timer(Math.random() * 5000)),
???
);
const dataB$ = of(dataB).pipe(
delayWhen(() => timer(Math.random() * 5000)),
???
);
let lastTimestamp = -Infinity;
dataA$.subscribe(({ timestamp }) => {
expect(timestamp > lastTimestamp).toBe(true);
lastTimestamp = timestamp;
});
dataB$.subscribe(({ timestamp }) => {
expect(timestamp > lastTimestamp).toBe(true);
lastTimestamp = timestamp;
});
Дополнительный вопрос: как можно расширить это решение для динамической поддержки любого количества потоков данных после создания потока?
Если бы вы удалили ???
из каждого из потоков здесь, этот фрагмент кода выдал бы, потому что lastTimestamp
является общим для двух потоков. Проблема в том, что у меня есть два разных потока, которые возвращают объекты с разной временной частотой (например, данные TimeSeries для двух разных устройств IoT), но оба они упорядочены. Я хочу, чтобы мои потоки регулировались/буферизировались таким образом, чтобы они были упорядочены по времени, но чтобы я получал все отдельные события. Представьте, что вы хотите воспроизвести серию событий, произошедших на нескольких устройствах в смоделированной среде.
Итак, выбросы обоих Observables должны быть каким-то образом упорядочены как единый поток?
Нет, выбросы обоих Observable должны совместно использовать состояние и буферизовать полученные значения до тех пор, пока общее состояние не диктует, что он может испускать.
когда общее состояние диктует, что они могут излучать? Представьте себе следующие метки времени: dataA
излучает 1
, dataB
излучает 5
, а затем dataA
излучает 3
. Следует ли опускать значение 3
или как dataB
знать, что ждать неизвестных значений dataA
?
Отличный пример. В этом случае dataB
должен ждать (т.е. значения буфера), пока следующая ожидающая эмиссия dataA
не будет больше или равна 5
или dataA
не завершится. Имея в виду, что оба потока гарантированно упорядочены по отметке времени. (Также добавьте, что dataA
должен делать то же самое для значений dataB
)
Я не уверен, существует ли подход «только для RxJS», использующий только операторы канала. Однако вот как бы я это сделал:
Чтобы сделать это воспроизводимым, я создал следующую настройку. Это просто помещает элементы массивов в Subjects
с задержкой.
const dataA$ = new Subject()
const dataB$ = new Subject()
timer(0, 4000).subscribe(t => {
if (!dataA[0]) return
dataA$.next(dataA[0])
dataA.shift()
})
timer(0, 3900).subscribe(t => {
if (!dataB[0]) return
dataB$.next(dataB[0])
dataB.shift()
})
Просто замените dataA$
и dataB$
своими собственными Observables и удалите timer
и подписки.
Результирующие потоки данных будут храниться в двух отдельных Observables, для которых я создал два Subjects
.
const resultA = new Subject()
const resultB = new Subject()
Было бы неплохо просто использовать dataA$
и datab$
и контролировать, как они излучают, ссылаясь друг на друга в своих каналах, но это создает циклическую зависимость, и я не уверен, что это управляемо.
Теперь нам также нужны два буфера, в которых хранятся промежуточные значения.
let aBuffer = []
let bBuffer = []
Давайте подпишемся на оба потока данных.
dataA$.subscribe(({ timestamp }) => {
if (bBuffer.length === 0) {
aBuffer.push(timestamp)
return
}
while (true) {
let first = bBuffer[0]
if (first > timestamp) {
resultA.next(timestamp)
break
} else if (timestamp > first) {
resultB.next(first)
bBuffer.shift()
} else {
resultA.next(timestamp)
resultB.next(first)
bBuffer.shift()
break
}
}
});
dataB$.subscribe(({ timestamp }) => {
if (aBuffer.length === 0) {
bBuffer.push(timestamp)
return
}
while (true) {
let first = aBuffer[0]
if (first > timestamp) {
resultB.next(timestamp)
break
} else if (timestamp > first) {
resultA.next(first)
aBuffer.shift()
} else {
resultB.next(timestamp)
resultA.next(first)
aBuffer.shift()
break
}
}
});
Обе реализации идентичны. Просто a
и b
изменены. В основном мы ждем, пока другой поток выдаст значение. До тех пор мы помещаем значения в буфер.
Когда поток излучает и текущая временная метка отстает от первого значения буфера, мы помещаем временную метку в результат. Но если первое значение буфера отстает, мы используем цикл while
, чтобы «догнать» и передать все отстающие значения в процессе.
Кажется, это отлично работает для примера в вопросе.
let lastTimestamp = -Infinity;
resultA.subscribe((r) => {
console.info("A", r, r > lastTimestamp)
lastTimestamp = r
})
resultB.subscribe((r) => {
console.info("B", r, r > lastTimestamp)
lastTimestamp = r
})
Вывод:
/* ... */
B 5373
true
B 6199
true
B 6207
true
A 6672
true
B 6896
true
A 7404
true
B 7410
true
/* ... */
Вдохновленный вашим ответом, я собрал оператор, который делает это: Главный нерешенный вопрос для меня заключается в операторе, который я опубликовал, как мне заставить его реагировать на любое количество потоков, добавляемых с течением времени. Увидев, что ваш ответ заставил меня двигаться в правильном направлении, я отмечу его как принятый.
RxJS
Простой пример
const { of, map, from, shareReplay, concatMap, endWith, NEVER, startWith, takeWhile, combineLatestAll, takeUntil, filter } = rxjs;
const dataA = [1, 10, 11, 15, 20];
const dataB = [0, 2, 3, 11, 21];
const dataC = [1, 3, 5, 9];
const max = Infinity;
const dataA$ = from(dataA);
const dataB$ = from(dataB);
const dataC$ = from(dataC);
const source$ = of(dataA$, dataB$, dataC$).pipe(
map(endWith(max)),
map(concatMap((x) => {
const destroy$ = source$.pipe(filter((y) => x === y));
return NEVER.pipe(takeUntil(destroy$), startWith(x));
})),
combineLatestAll(Math.min),
takeWhile((x) => x !== max),
shareReplay(1)
);
source$.subscribe(console.info);
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>
Мне непонятно, в чем дело. Как выглядят ваши настоящие Observables? Постоянно ли они излучают значения? То, что написано в вопросе, уже должно работать, если это так. Или они излучают массивы?