Объединить несколько наблюдаемых из rxjs в один буфер

У меня есть 2 подключения к веб-сокетам (используя LiveQuery из Parse, если это имеет значение).

Для каждого веб-сокета у меня есть 3 события, которые я хочу зафиксировать: "create", "update" и "delete".

Множество обновлений, скорее всего, придет одновременно, поэтому для одного события я использую буфер, чтобы зафиксировать последовательность обновлений, которые приходят одновременно:

const base = fromEvent(listener, type);
const triggerBuffer = base.pipe(debounceTime(500));
const buffered = base.pipe(buffer(triggerBuffer));

Это прекрасно работает. Когда с сервера приходит куча обновлений, они буферизуются и все выпускаются одновременно.

Я хочу объединить все события из обоих веб-сокетов в одно обновление. Я пытался добиться этого с помощью оператора zip следующим образом:

const setupServerObservable = (listener, listenerName) => type => {
  const base = fromEvent(listener, type);
  return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};

const setupServerObservables = subscriptions => {
  const joins = setupServerObservable(subscriptions.joins, "joins");
  const containers = setupServerObservable(
    subscriptions.containers,
    "containers"
  );

  const eventTypes = ["update", "create", "delete"];

  const allJoins = eventTypes.map(joins);
  const allContainers = eventTypes.map(containers);
  const all = allJoins.concat(allContainers);
  const zippedObservables = zip(all);
  console.info(zippedObservables);
  const triggerBuffer = zippedObservables.pipe(debounceTime(500));
  const buffered = zippedObservables.pipe(buffer(triggerBuffer));
  return buffered;
};

Но когда я пытаюсь подписаться на этот вновь созданный Observable, он не выдает никаких обновлений с сервера. Это console.info(zippedObservables) производит следующий вывод:

Observable {_isScalar: false, source: Observable, operator: ZipOperator}
operator: ZipOperator
    resultSelector: Array(6)
     0: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     1: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     2: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     3: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     4: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     5: Observable {_isScalar: false, source: Observable, operator: MapOperator}
length: 6
__proto__: Array(0)
__proto__: Object
source: Observable {_isScalar: false, _subscribe: ƒ}
_isScalar: false
__proto__: Object

Я думаю, что, вероятно, неправильно понимаю, что делает оператор zip, но я не смог заставить его работать с merge или concat.

Эта статья должна помочь вам понять, как работает zip. scotch.io/tutorials/…

Julian Liu 05.06.2019 05:27

@pindev, это был действительно полезный урок. Однако кажется, что ни один из этих операторов не является тем, что я ищу. Есть ли другой оператор, который будет просто генерировать событие от любых других наблюдаемых объектов в любом порядке их поступления?

TheCriticalImperitive 05.06.2019 05:43

Я думаю combineLatest за это. scotch.io/tutorials/…

Julian Liu 05.06.2019 05:57
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
0
3
291
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Как только я посмотрел на учебник, опубликованный в комментариях, я понял, что zip это не то, что я хотел.

Я пробовал то же самое с merge, но это не сработало. Кажется, что merge не принимает массив, он должен принимать каждый Observable для слияния как отдельный параметр, например:

const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);

Итак, окончательный код:

const setupServerObservable = (listener, listenerName) => type => {
  const base = fromEvent(listener, type);
  return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};

const setupServerObservables = subscriptions => {
  const joins = setupServerObservable(subscriptions.joins, "joins");
  const containers = setupServerObservable(
    subscriptions.containers,
    "containers"
  );

  const eventTypes = ["update", "create", "delete"];

  const allJoins = eventTypes.map(joins);
  const allContainers = eventTypes.map(containers);
  const all = allJoins.concat(allContainers);
  const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);
  const triggerBuffer = merged.pipe(debounceTime(500));
  const buffered = merged.pipe(buffer(triggerBuffer));
  return buffered;
};

Было бы неплохо, если бы был способ сделать это с помощью массива, но, похоже, нет.

Другие вопросы по теме