Моя эпопея просыпается, когда отправляется действие REMOTE_DATA_STARTED, и он извлекает данные с помощью action.url и action.owner.
Мне нужно убедиться, что я не инициирую два одновременных вызова одному и тому же владельцу / URL-адресу. Как только вызов владельца / URL-адреса завершен, можно позже запустить еще один для того же владельца / URL-адреса.
Аннулирование - это не то, что я ищу здесь, потому что я не хочу отменять существующие запросы, я хочу предотвратить запуск новых запросов.
Я чувствую, что мне нужна смесь exhaustMap и groupBy, но я не знаю, что делать дальше.
Это моя эпопея на данный момент, она отклоняет все одновременные вызовы, а не по владельцу / URL
const myEpic = action$ =>
action$.ofType("REMOTE_DATA_STARTED").exhaustMap(action =>
fakeAjaxCall().map(() => {
return { type: "COMPLETED", owner: action.owner, url: action.url };
})
);
Я создал этот тестовый проект с неудачным тестовым примером. Вы можете помочь мне с этим работать?
https://codesandbox.io/s/l71zq6x8zl
Как вы увидите, test1_exhaustMapByActionType_easy работает нормально, а вот test2_exhaustMapByActionTypeOwnerAndUrl дает сбой.
Убедитесь, что вы развернули консоль, чтобы увидеть результаты теста.
Вот точный код вашей проблемы: codeandbox.io/s/vyloq48xoymiles_christian исправил тестовые случаи также, чтобы удалить проблемы с синхронизацией.
@ wp78de, если вы добавите свой ответ в свое решение, я приму ваш ответ. Ваш код отличается от кода ZahiC, и хотя его ответ также может работать, ваш код легче понять (по крайней мере, для меня).
@ Сильвен, с удовольствием!



![Безумие обратных вызовов в javascript [JS]](https://i.imgur.com/WsjO6zJb.png)


Это, безусловно, можно сделать элегантным способом с помощью groupBy & exstMap:
const groupedByExhaustMap = (keySelector, project) =>
source$ => source$.pipe(
groupBy(keySelector),
mergeMap(groupedCalls =>
groupedCalls.pipe(
exhaustMap(project)
)
)
);
const { delay, groupBy, mergeMap, exhaustMap } = Rx.operators;
const groupedByExhaustMap = (keySelector, project) =>
source$ => source$.pipe(
groupBy(keySelector),
mergeMap(groupedCalls =>
groupedCalls.pipe(
exhaustMap(project)
)
)
);
const calls = [ // every call takes 500ms
{startTime: 0, owner: 1, url: 'url1'},
{startTime: 200, owner: 2, url: 'url2'},
{startTime: 400, owner: 1, url: 'url1'}, // dropped
{startTime: 400, owner: 1, url: 'url2'},
{startTime: 600, owner: 1, url: 'url1'}
];
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
const simulateCallsOverTime$ = Rx.Observable.from(calls)
.pipe(
mergeMap(call => Rx.Observable.of(call)
.pipe(
delay(call.startTime)
)
)
);
simulateCallsOverTime$
.pipe(
groupedByExhaustMap(
call => `${call.owner}_${call.url}`,
async call => {
await sleep(500); // http call goes here
return call;
}
)
)
.subscribe(console.info);<script src = "https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>Есть много способов найти решение. Моя первая мысль заключалась в том, что вы можете разделить действия на темы владельца + URL, а затем работать с ними:
const myEpic = (action$) => {
const completed$ = new Subject();
const flights = new DefaultMap((pair$) =>
pair$.exhaustMap((action) =>
fakeAjaxCall().map(() => ({
...action,
type: 'COMPLETED',
}))
)
.subscribe((action) => completed$.next(action))
);
action$.ofType('REMOTE_DATA_STARTED')
.subscribe((action) => {
flights.get(`${action.owner}+${action.url}`).next(action);
});
return completed$;
};
Это работает, но, по общему признанию, требует поддержки своего рода «карты по умолчанию», где пары новый владелец + URL получают новый Subject (я написал быструю реализацию). Тестовый пример, который он проходит:
test('myEpic does both drop actions and NOT drop actions for two owner+url pairs', async () => {
const arrayOfAtMost = (action$, limit) => action$.take(limit)
.timeoutWith(1000, Observable.empty())
.toArray().toPromise();
const action$ = new ActionsObservable(
Observable.create((observer) => {
// Jim #1 emits four (4) concurrent calls—we expect only two to be COMPLETED, one per URL
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.com', owner: 'jim1' });
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.com', owner: 'jim1' });
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.org', owner: 'jim1' });
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.org', owner: 'jim1' });
// Jim #2 emits two (2) calls at the same time as Jim #1—we expect only one to be COMPLETED, deduped URLs
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.biz', owner: 'jim2' });
observer.next({ type: 'REMOTE_DATA_STARTED', url: 'google.biz', owner: 'jim2' });
// Once all of the above calls are completed, Jim #1 and Jim #2 make calls simultaneously
// We expect both to be COMPLETED
setTimeout(() => {
const url = 'https://stackoverflow.com/q/49563059/1267663';
observer.next({ type: 'REMOTE_DATA_STARTED', url, owner: 'jim1' });
observer.next({ type: 'REMOTE_DATA_STARTED', url, owner: 'jim2' });
}, 505);
})
);
const resultant$ = myEpic(action$);
const results = await arrayOfAtMost(resultant$, 5);
expect(results).toEqual([
{ type: 'COMPLETED', url: 'google.com', owner: 'jim1' },
{ type: 'COMPLETED', url: 'google.org', owner: 'jim1' },
{ type: 'COMPLETED', url: 'google.biz', owner: 'jim2' },
{ type: 'COMPLETED', url: 'https://stackoverflow.com/q/49563059/1267663', owner: 'jim1' },
{ type: 'COMPLETED', url: 'https://stackoverflow.com/q/49563059/1267663', owner: 'jim2' },
]);
});
Полное решение, включая реализацию DefaultMap:
const { Observable, Subject } = require('rxjs');
class DefaultMap extends Map {
constructor(initializeValue) {
super();
this._initializeValue = initializeValue || (() => {});
}
get(key) {
if (this.has(key)) {
return super.get(key);
}
const subject = new Subject();
this._initializeValue(subject);
this.set(key, subject);
return subject;
}
}
const fakeAjaxCall = () => Observable.timer(500);
const myEpic = (action$) => {
const completed$ = new Subject();
const flights = new DefaultMap((uniquePair) =>
uniquePair.exhaustMap((action) =>
fakeAjaxCall().map(() => ({
...action,
type: 'COMPLETED',
}))
)
.subscribe((action) => completed$.next(action))
);
action$.ofType('REMOTE_DATA_STARTED')
.subscribe((action) => {
flights.get(`${action.owner}+${action.url}`).next(action);
});
return completed$;
};* Приведенный выше фрагмент на самом деле не работает, я просто хотел, чтобы он был разборным.
Я собрал работоспособный пример с тестовыми примерами на GitHub.
groupBy и exhaustMapЯ написал исходное решение с тестами только для того, чтобы обнаружить, что да, это возможно с существующими операторами, groupBy и exhaustMap, которые вы предложили:
const myEpic = action$ =>
action$.ofType('REMOTE_DATA_STARTED')
.groupBy((action) => `${action.owner}+${action.url}`)
.flatMap((pair$) =>
pair$.exhaustMap(action =>
fakeAjaxCall().map(() => ({
...action,
type: 'COMPLETED',
}))
)
);
Выполнение этого с тем же набором тестов, приведенным выше, будет успешным.
Хорошо, поехали:
GroupBy req.owner, сгладим результаты:
const myEpic = action$ =>
action$
.ofType("REMOTE_DATA_STARTED")
.groupBy(req => req.owner)
.flatMap(ownerGroup => ownerGroup.groupBy(ownerReq => ownerReq.url))
.flatMap(urlGroup =>
urlGroup.exhaustMap(action =>
fakeAjaxCall().map(() => ({ type: "COMPLETED", owner: action.owner, url: action.url }))
)
)
Не забывайте observe.complete();
const test1_exhaustMapByActionType_easy = () => {
const action$ = new ActionsObservable(
Observable.create(observer => {
observer.next({ type: "REMOTE_DATA_STARTED", owner: "ownerX", url: "url1" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "ownerX", url: "url1" });
setTimeout(() => {
observer.next({ type: "REMOTE_DATA_STARTED", owner: "ownerX", url: "url1" });
observer.complete();
}, 30);
})
);
const emittedActions = [];
const epic$ = myEpic(action$);
epic$.subscribe(action => emittedActions.push(action), null, () => expect("test1_exhaustMapByActionType_easy", 2, emittedActions));
};
То же самое:
const test2_exhaustMapByActionTypeOwnerAndUrl = () => {
const action$ = new ActionsObservable(
Observable.create(observer => {
// owner1 emmits 4 concurrent calls, we expect only two to COMPLETED actions; one per URL:
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner1", url: "url1" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner1", url: "url1" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner1", url: "url2" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner1", url: "url2" });
// owner2 emmits 2 calls at the same time as owner 1. because the two calls
// from owner2 have the same url, we expecty only one COMPLETED action
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner2", url: "url1" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner2", url: "url1" });
// Once all of the above calls are completed each owner makes one concurrent call
// we expect each call to go throught and generate a COMPLETED action
setTimeout(() => {
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner1", url: "url1" });
observer.next({ type: "REMOTE_DATA_STARTED", owner: "owner2", url: "url1" });
observer.complete();
}, 30);
})
);
const emittedActions = [];
const epic$ = myEpic(action$);
epic$.subscribe(action => emittedActions.push(action), null, () => expect("test2_exhaustMapByActionTypeOwnerAndUrl", 5, emittedActions));
};
Обратите внимание, что завершение потока действий action$ (вызов observer.complete();) неверно, потому что redux-observable не завершает поток - action$ является бесконечным горячим наблюдаемым
Также обратите внимание, что flatMap-вставлять поток дважды в эпопее не нужно.
Если я правильно понимаю проблему, вам понадобится какое-то состояние, чтобы отслеживать, какие ожидающие запросы у вас есть от пары владелец / URL. Насколько конкретно вы выражаете это в терминах существующих операторов? Было бы проще / проще сохранить отдельное сопоставление владельцев / URL-адресов с субъектами, а затем вводить в него действия (например, одна тема для каждого ключа, следующий вызов с действиями, исчерпание карт, объединение всех вместе).