У меня есть 2 подключения к веб-сокетам (используя LiveQuery из Parse, если это имеет значение).
Для каждого веб-сокета у меня есть 3 события, которые я хочу зафиксировать: "create", "update" и "delete".
Множество обновлений, скорее всего, придет одновременно, поэтому для одного события я использую буфер, чтобы зафиксировать последовательность обновлений, которые приходят одновременно:
const base = fromEvent(listener, type);
const triggerBuffer = base.pipe(debounceTime(500));
const buffered = base.pipe(buffer(triggerBuffer));
Это прекрасно работает. Когда с сервера приходит куча обновлений, они буферизуются и все выпускаются одновременно.
Я хочу объединить все события из обоих веб-сокетов в одно обновление. Я пытался добиться этого с помощью оператора zip следующим образом:
const setupServerObservable = (listener, listenerName) => type => {
const base = fromEvent(listener, type);
return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};
const setupServerObservables = subscriptions => {
const joins = setupServerObservable(subscriptions.joins, "joins");
const containers = setupServerObservable(
subscriptions.containers,
"containers"
);
const eventTypes = ["update", "create", "delete"];
const allJoins = eventTypes.map(joins);
const allContainers = eventTypes.map(containers);
const all = allJoins.concat(allContainers);
const zippedObservables = zip(all);
console.info(zippedObservables);
const triggerBuffer = zippedObservables.pipe(debounceTime(500));
const buffered = zippedObservables.pipe(buffer(triggerBuffer));
return buffered;
};
Но когда я пытаюсь подписаться на этот вновь созданный Observable, он не выдает никаких обновлений с сервера. Это console.info(zippedObservables) производит следующий вывод:
Observable {_isScalar: false, source: Observable, operator: ZipOperator}
operator: ZipOperator
resultSelector: Array(6)
0: Observable {_isScalar: false, source: Observable, operator: MapOperator}
1: Observable {_isScalar: false, source: Observable, operator: MapOperator}
2: Observable {_isScalar: false, source: Observable, operator: MapOperator}
3: Observable {_isScalar: false, source: Observable, operator: MapOperator}
4: Observable {_isScalar: false, source: Observable, operator: MapOperator}
5: Observable {_isScalar: false, source: Observable, operator: MapOperator}
length: 6
__proto__: Array(0)
__proto__: Object
source: Observable {_isScalar: false, _subscribe: ƒ}
_isScalar: false
__proto__: Object
Я думаю, что, вероятно, неправильно понимаю, что делает оператор zip, но я не смог заставить его работать с merge или concat.
@pindev, это был действительно полезный урок. Однако кажется, что ни один из этих операторов не является тем, что я ищу. Есть ли другой оператор, который будет просто генерировать событие от любых других наблюдаемых объектов в любом порядке их поступления?
Я думаю combineLatest за это. scotch.io/tutorials/…



![Безумие обратных вызовов в javascript [JS]](https://i.imgur.com/WsjO6zJb.png)


Как только я посмотрел на учебник, опубликованный в комментариях, я понял, что zip это не то, что я хотел.
Я пробовал то же самое с merge, но это не сработало. Кажется, что merge не принимает массив, он должен принимать каждый Observable для слияния как отдельный параметр, например:
const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);
Итак, окончательный код:
const setupServerObservable = (listener, listenerName) => type => {
const base = fromEvent(listener, type);
return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};
const setupServerObservables = subscriptions => {
const joins = setupServerObservable(subscriptions.joins, "joins");
const containers = setupServerObservable(
subscriptions.containers,
"containers"
);
const eventTypes = ["update", "create", "delete"];
const allJoins = eventTypes.map(joins);
const allContainers = eventTypes.map(containers);
const all = allJoins.concat(allContainers);
const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);
const triggerBuffer = merged.pipe(debounceTime(500));
const buffered = merged.pipe(buffer(triggerBuffer));
return buffered;
};
Было бы неплохо, если бы был способ сделать это с помощью массива, но, похоже, нет.
Эта статья должна помочь вам понять, как работает
zip. scotch.io/tutorials/…