Я совершенно новичок в rxjs и пытаюсь выполнить одну задачу - по сути, мне нужен API-интерфейс, похожий на Promise, со следующими отличиями:
Promise
мой API должен иметь возможность resolve
несколько раз.await
/then
следует использовать последнее разрешенное значение.await
/then
должен зависнуть, как и стандартный Promise
.reset
, который будет сбрасывать мой объект API, чтобы следующие вызовы await
/then
зависали, как в пункте 3.Как я могу реализовать необходимое поведение? Я думаю, что rxjs Observable
и lastValueFrom
— это то, что мне нужно, но я просто не могу понять, как его использовать.
Пример использования псевдокода:
const myAuthObservable = new MyObservableApi<Auth>((resolve) =>
subscribe(
(s) => s.auth,
(auth) => {
if (auth) {
resolve(auth); // this can be called multiple times
}
}
)
);
emitter.on("start", async () => {
// always the last auth will be returned
// or the api will hung up until the first auth is emitted
const auth = await myAuthObservable;
myAuthObservable.reset();
});
Похоже, вам нужен объект, который кэширует значение, к которому разрешается обещание, а затем возвращает это кэшированное значение при будущем использовании.
Выглядит как BehaviorSubject
, за исключением части «сброс». Непонятно также, что имеется в виду под «зависать».
@HereticMonkey я имел в виду hung up
, извини
Что именно вы подразумеваете под «следует использовать последнее разрешенное значение»? Как ваш API сигнализирует, какое из разрешенных значений будет последним? lastValueFrom
работает только тогда, когда наблюдаемая в какой-то момент закрыта.
@Bergi Я думаю, он имеет в виду, что пока вы не сбросите его, он будет продолжать возвращать самое последнее кэшированное значение.
Я считаю, что у меня есть то, что вы ищете — собственный субъект, который ведет себя как ReplaySubject с одним буфером, но может быть сброшен. Этот код был адаптирован на основе именно этого класса, чтобы обеспечить необходимую вам функциональность.
const NOT_SET = Symbol();
export class ResettableSubject<T> extends Subject<T> {
private value: T | NOT_SET = NOT_SET;
constructor() {
super();
}
/** Updates the subject's value and caches it. */
override next(value: T): void {
this.value = value;
super.next(value);
}
/** Resets the cached value. */
reset() {
this.value = NOT_SET;
}
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = this._innerSubscribe(subscriber);
if (this.value !== NOT_SET) {
subscriber.next(this.value);
}
return subscription;
}
}
В приведенном ниже примере подписка будет отправлять все значения, но обещания, созданные из firstValueFrom, будут отправлять только в том случае, если у субъекта есть значение, а сброс не очистил кэшированное значение.
const source$ = new ResettableSubject<number>();
// displays all values.
source$.subscribe(x => console.info(`from subscription ${x}`));
// will only display the first value once set.
firstValueFrom(source$).then(x => console.info(`first value before first next: ${x}`));
source$.next(1);
// will only display the first value.
firstValueFrom(source$).then(x => console.info(`first value displayed immediately: ${x}`));
source$.next(2); // the original subscription should update.
source$.reset(); // no future subscribers will see 2.
firstValueFrom(source$).then(x => console.info(`first value after reset ${x}`));
source$.next(3); // 3 is output by subscription and last promise.
Ваше использование будет похоже на пример ниже. Независимо от того, что устанавливает myAuthObservable, необходимо убедиться, что Auth не является неопределенным, или вы можете обновить класс ResettableSubject, чтобы он игнорировал неопределенные значения, передаваемые следующему методу.
const myAuthObservable = new ResettableSubject<Auth>();
emitter.on("start", async () => {
// Only emits when myAuthObservable gets updated after reset below was called.
const auth = await firstValueFrom(myAuthObservable);
myAuthObservable.reset();
});
да, это сработает. одно улучшение value
начальное значение может быть symbol
вместо undefined
и это позволит удалить ненужного hasValue
члена
Спасибо @nikitakot. Использование символа было более чистым способом сделать что-то.
прочитав комментарий @Barmar, я в конце концов нашел простое решение, отличное от rxjs.
Похоже, вам нужен объект, который кэширует значение, к которому разрешается обещание, а затем возвращает это кэшированное значение при будущем использовании.
export class MultiResolve<T> {
private readonly initialSymbol: symbol = Symbol();
private currentValue: T | symbol = this.initialSymbol;
private readonly resolvers: Array<(value: T) => void> = [];
resolve(value: T): void {
this.currentValue = value;
for (const resolver of this.resolvers) {
resolver(value);
}
}
get promise(): Promise<T> {
return new Promise(resolve => {
if (this.currentValue !== this.initialSymbol) {
resolve(this.currentValue as T);
}
this.resolvers.push(resolve);
});
}
reset(): void {
this.currentValue = this.initialSymbol;
}
}
Применение:
const multiResolve = new MultiResolve<number>();
async function test() {
console.info(await multiResolve.promise); // Will wait until the first resolve
multiResolve.resolve(1);
console.info(await multiResolve.promise); // Will immediately get 1
multiResolve.resolve(undefined as any); // TypeScript requires an explicit cast to 'any' for undefined
console.info(await multiResolve.promise); // Will immediately get undefined
multiResolve.resolve(2);
console.info(await multiResolve.promise); // Will immediately get 2
}
test();
setTimeout(() => {
multiResolve.resolve(3); // Will immediately get 3 in the next await
}, 1000);
Похоже, вам нужен шаблон подписчика.