Упорядочивание потоков RXJS по свойству данных

У меня возникла проблема, когда я пытаюсь заказать несколько отдельных потоков из базы данных временных рядов. Предполагая, что все данные в каждом потоке отсортированы по метке времени, учитывая следующий код, как бы я изменил потоки dataA$ и dataB$ так, чтобы каждый из них выдавал значения в порядке значения с меткой времени БЕЗ ожидания завершения всего потока:

import { delayWhen, of, timer } from "rxjs";

const dataA = [{"data":"b","timestamp":6672},{"data":"c","timestamp":7404},{"data":"a","timestamp":7922},{"data":"b","timestamp":8885},{"data":"c","timestamp":9111},{"data":"a","timestamp":9245},{"data":"c","timestamp":10168},{"data":"b","timestamp":10778},{"data":"c","timestamp":11504},{"data":"a","timestamp":12398},{"data":"a","timestamp":12745},{"data":"a","timestamp":13648},{"data":"a","timestamp":14233},{"data":"a","timestamp":14943},{"data":"b","timestamp":15869},{"data":"c","timestamp":16043},{"data":"a","timestamp":16169},{"data":"a","timestamp":16242},{"data":"a","timestamp":17058},{"data":"b","timestamp":17885},{"data":"a","timestamp":18252},{"data":"a","timestamp":18711},{"data":"c","timestamp":18883},{"data":"b","timestamp":19618},{"data":"a","timestamp":20183}];

const dataB = [{"data":"b","timestamp":821},{"data":"b","timestamp":1357},{"data":"b","timestamp":2108},{"data":"b","timestamp":3001},{"data":"a","timestamp":3995},{"data":"b","timestamp":4475},{"data":"c","timestamp":5357},{"data":"c","timestamp":5373},{"data":"b","timestamp":6199},{"data":"c","timestamp":6207},{"data":"b","timestamp":6896},{"data":"b","timestamp":7410},{"data":"a","timestamp":8335},{"data":"a","timestamp":9191},{"data":"b","timestamp":10007},{"data":"b","timestamp":10703},{"data":"c","timestamp":11225},{"data":"c","timestamp":11453},{"data":"c","timestamp":12131},{"data":"c","timestamp":12599},{"data":"c","timestamp":13567},{"data":"a","timestamp":13726},{"data":"b","timestamp":14161},{"data":"b","timestamp":14224},{"data":"b","timestamp":14666}];

const dataA$ = of(dataA).pipe(
  delayWhen(() => timer(Math.random() * 5000)),
  ???
);
const dataB$ = of(dataB).pipe(
  delayWhen(() => timer(Math.random() * 5000)),
  ???
);

let lastTimestamp = -Infinity;
dataA$.subscribe(({ timestamp }) => {
  expect(timestamp > lastTimestamp).toBe(true);
  lastTimestamp = timestamp;
});

dataB$.subscribe(({ timestamp }) => {
  expect(timestamp > lastTimestamp).toBe(true);
  lastTimestamp = timestamp;
});

Дополнительный вопрос: как можно расширить это решение для динамической поддержки любого количества потоков данных после создания потока?

Мне непонятно, в чем дело. Как выглядят ваши настоящие Observables? Постоянно ли они излучают значения? То, что написано в вопросе, уже должно работать, если это так. Или они излучают массивы?

Tobias S. 17.11.2022 01:52

Если бы вы удалили ??? из каждого из потоков здесь, этот фрагмент кода выдал бы, потому что lastTimestamp является общим для двух потоков. Проблема в том, что у меня есть два разных потока, которые возвращают объекты с разной временной частотой (например, данные TimeSeries для двух разных устройств IoT), но оба они упорядочены. Я хочу, чтобы мои потоки регулировались/буферизировались таким образом, чтобы они были упорядочены по времени, но чтобы я получал все отдельные события. Представьте, что вы хотите воспроизвести серию событий, произошедших на нескольких устройствах в смоделированной среде.

Josh C. 17.11.2022 01:58

Итак, выбросы обоих Observables должны быть каким-то образом упорядочены как единый поток?

Tobias S. 17.11.2022 02:02

Нет, выбросы обоих Observable должны совместно использовать состояние и буферизовать полученные значения до тех пор, пока общее состояние не диктует, что он может испускать.

Josh C. 17.11.2022 02:03

когда общее состояние диктует, что они могут излучать? Представьте себе следующие метки времени: dataA излучает 1, dataB излучает 5, а затем dataA излучает 3. Следует ли опускать значение 3 или как dataB знать, что ждать неизвестных значений dataA?

Tobias S. 17.11.2022 02:09

Отличный пример. В этом случае dataB должен ждать (т.е. значения буфера), пока следующая ожидающая эмиссия dataA не будет больше или равна 5 или dataA не завершится. Имея в виду, что оба потока гарантированно упорядочены по отметке времени. (Также добавьте, что dataA должен делать то же самое для значений dataB)

Josh C. 17.11.2022 02:16
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
2
6
111
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я не уверен, существует ли подход «только для RxJS», использующий только операторы канала. Однако вот как бы я это сделал:

Чтобы сделать это воспроизводимым, я создал следующую настройку. Это просто помещает элементы массивов в Subjects с задержкой.

const dataA$ = new Subject()
const dataB$ = new Subject()

timer(0, 4000).subscribe(t => {
  if (!dataA[0]) return
  dataA$.next(dataA[0])
  dataA.shift()
})

timer(0, 3900).subscribe(t => {
  if (!dataB[0]) return
  dataB$.next(dataB[0])
  dataB.shift()
})

Просто замените dataA$ и dataB$ своими собственными Observables и удалите timer и подписки.


Результирующие потоки данных будут храниться в двух отдельных Observables, для которых я создал два Subjects.

const resultA = new Subject()
const resultB = new Subject()

Было бы неплохо просто использовать dataA$ и datab$ и контролировать, как они излучают, ссылаясь друг на друга в своих каналах, но это создает циклическую зависимость, и я не уверен, что это управляемо.

Теперь нам также нужны два буфера, в которых хранятся промежуточные значения.

let aBuffer = []
let bBuffer = []

Давайте подпишемся на оба потока данных.

dataA$.subscribe(({ timestamp }) => {
  if (bBuffer.length === 0) {
    aBuffer.push(timestamp)
    return
  }

  while (true) {
    let first = bBuffer[0]

    if (first > timestamp) {
      resultA.next(timestamp)
      break
    } else if (timestamp > first) {
      resultB.next(first)
      bBuffer.shift()
    } else {
      resultA.next(timestamp)
      resultB.next(first)
      bBuffer.shift()
      break
    }
  }
});

dataB$.subscribe(({ timestamp }) => {
  if (aBuffer.length === 0) {
    bBuffer.push(timestamp)
    return
  }

  while (true) {
    let first = aBuffer[0]

    if (first > timestamp) {
      resultB.next(timestamp)
      break
    } else if (timestamp > first) {
      resultA.next(first)
      aBuffer.shift()
    } else {
      resultB.next(timestamp)
      resultA.next(first)
      aBuffer.shift()
      break
    }
  }
});

Обе реализации идентичны. Просто a и b изменены. В основном мы ждем, пока другой поток выдаст значение. До тех пор мы помещаем значения в буфер.

Когда поток излучает и текущая временная метка отстает от первого значения буфера, мы помещаем временную метку в результат. Но если первое значение буфера отстает, мы используем цикл while, чтобы «догнать» и передать все отстающие значения в процессе.

Кажется, это отлично работает для примера в вопросе.

let lastTimestamp = -Infinity;
resultA.subscribe((r) => {
  console.info("A", r, r > lastTimestamp)
  lastTimestamp = r
})
resultB.subscribe((r) => {
  console.info("B", r, r > lastTimestamp)
  lastTimestamp = r
})

Вывод:

/* ... */ 
B 5373
true
B 6199
true
B 6207
true
A 6672
true
B 6896
true
A 7404
true
B 7410
true
/* ... */ 

Вдохновленный вашим ответом, я собрал оператор, который делает это: Главный нерешенный вопрос для меня заключается в операторе, который я опубликовал, как мне заставить его реагировать на любое количество потоков, добавляемых с течением времени. Увидев, что ваш ответ заставил меня двигаться в правильном направлении, я отмечу его как принятый.

Josh C. 18.11.2022 18:02

RxJS

Простой пример

const { of, map, from, shareReplay, concatMap, endWith, NEVER, startWith, takeWhile, combineLatestAll, takeUntil, filter } = rxjs;

const dataA = [1, 10, 11, 15, 20];
const dataB = [0, 2, 3, 11, 21];
const dataC = [1, 3, 5, 9];
const max = Infinity;

const dataA$ = from(dataA);
const dataB$ = from(dataB);
const dataC$ = from(dataC);

const source$ = of(dataA$, dataB$, dataC$).pipe(
  map(endWith(max)),
  map(concatMap((x) => {
    const destroy$ = source$.pipe(filter((y) => x === y));
    return NEVER.pipe(takeUntil(destroy$), startWith(x));
  })),
  combineLatestAll(Math.min),
  takeWhile((x) => x !== max),
  shareReplay(1)
);

source$.subscribe(console.info);
<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

Другие вопросы по теме