Давайте начнем с основ и постепенно перейдем к более продвинутым концепциям в RxJS в Angular
Observables - это основа RxJS, представляющая собой поток данных, которые можно наблюдать в течение времени. Наблюдатели являются потребителями этих потоков, прослушивая передаваемые данные и реагируя соответствующим образом.
Observables и Observers похожи на концепции событий и обработчиков событий в C#.
В Angular вы можете создать Observable, используя класс Observable из библиотеки rxjs. Observable может испускать данные асинхронно с течением времени и может быть подписан наблюдателями на получение испускаемых данных.
Пример в Angular:
import { Observable } from 'rxjs'; // Create an Observable that emits a stream of numbers const numbers$ = new Observable(observer => { let count = 1; const intervalId = setInterval(() => { observer.next(count++); }, 1000); // Cleanup logic when the Observable is unsubscribed return () => { clearInterval(intervalId); }; }); // Subscribe to the Observable to receive the emitted numbers numbers$.subscribe(value => console.info(value));
В Angular наблюдатель - это объект, который определяет, как обрабатывать данные, полученные от Observable.
Он имеет три дополнительных метода:
Next для обработки выданных данных
Error для обработки любых ошибок
Complete для обработки завершения работы Observable.
Пример в Angular:
import { Observable } from 'rxjs'; // Create an Observable that emits a stream of numbers const numbers$ = new Observable(observer => { let count = 1; const intervalId = setInterval(() => { observer.next(count++); }, 1000); // Cleanup logic when the Observable is unsubscribed return () => { clearInterval(intervalId); }; }); // Define an Observer to handle the emitted numbers const observer = { next: value => console.info(value), error: err => console.error(err), complete: () => console.info('Observable completed') }; // Subscribe the Observer to the Observable numbers$.subscribe(observer);
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use map operator to square each number in the stream const squaredNumbers$ = numbers$.pipe( map(num => num * num) ); // Subscribe to the transformed data stream squaredNumbers$.subscribe(value => console.info(value));
Пример в Angular с оператором фильтрации:
import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use filter operator to get only even numbers from the stream const evenNumbers$ = numbers$.pipe( filter(num => num % 2 === 0) ); // Subscribe to the filtered even numbers stream evenNumbers$.subscribe(value => console.info(value));
Субъекты - это одновременно Observables и Observers, что позволяет вам эмитировать и подписываться на потоки данных напрямую.
Субъекты похожи на концепцию эмиттеров событий в C#.
Пример в Angular с использованием Subject:
import { Subject } from 'rxjs'; // Create a Subject to emit and subscribe to a stream of numbers const numbersSubject$ = new Subject<number>(); // Subscribe to the Subject to receive emitted numbers numbersSubject$.subscribe(value => console.info(value)); // Emit numbers to the Subject numbersSubject$.next(1); numbersSubject$.next(2); numbersSubject$.next(3);
Наблюдаемые можно разделить на два типа
Горячие обозреватели выдают данные независимо от наличия подписчиков Холодные обозреватели выдают данные только при наличии активных подписчиков.
Пример в Angular с Hot и Cold Observables:
import { interval, fromEvent } from 'rxjs'; // Hot Observable - emits data regardless of subscribers const hotObservable$ = interval(1000); // Cold Observable - emits data only when subscribed const button = document.querySelector('button'); const coldObservable$ = fromEvent(button, 'click'); // Subscribe to both Observables hotObservable$.subscribe(value => console.info(`Hot: ${value}`)); coldObservable$.subscribe(value => console.info(`Cold: ${value}`));
RxJS предоставляет операторы для обработки ошибок в Observables, такие как catchError и retry, которые позволяют обрабатывать ошибки и повторные попытки в потоке данных.
Пример в Angular с обработкой ошибок:
import { of } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; // Create an observable that may emit an error const numbers$ = of(1, 2, 3, 4, 5, 'six'); // Use catchError operator to handle errors const numbersWithErrorHandled$ = numbers$.pipe( catchError(err => of('Error occurred:', err)) ); // Use retry operator to retry the observable in case of error const numbersWithRetry$ = numbers$.pipe( retry(2) // Retry 2 times in case of error ); // Subscribe to the error handled and retried observables numbersWithErrorHandled$.subscribe(value => console.info(value)); numbersWithRetry$.subscribe(value => console.info(value));
RxJS позволяет создавать свои собственные операторы путем компоновки существующих операторов или расширения класса Observable. Это дает вам гибкость в создании многократно используемых и специализированных операторов для ваших конкретных случаев использования.
Пример в Angular с пользовательским оператором:
import { Observable, OperatorFunction } from 'rxjs'; import { map } from 'rxjs/operators'; // Custom operator to multiply each emitted number by a given factor function multiplyBy(factor: number): OperatorFunction<number, number> { return (source: Observable<number>) => source.pipe(map(num => num * factor)); } // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use the custom multiplyBy operator to multiply each number by 10 const multipliedNumbers$ = numbers$.pipe(multiplyBy(10)); // Subscribe to the multiplied numbers multipliedNumbers$.subscribe(value => console.info(value));
RxJS позволяет управлять контекстом выполнения или планировщиком Observable. Планировщики предоставляют возможности для управления параллелизмом, контроля времени и выполнения кода в определенных потоках или контекстах.
Пример в Angular с планировщиками:
import { of, asyncScheduler } from 'rxjs'; import { observeOn } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use observeOn operator to specify an asyncScheduler for subscription const asyncNumbers$ = numbers$.pipe(observeOn(asyncScheduler)); // Subscribe to the numbers with asyncScheduler asyncNumbers$.subscribe(value => console.info(value)); // Use observeOn operator with other schedulers like asapScheduler or queueScheduler const asapNumbers$ = numbers$.pipe(observeOn(asapScheduler)); const queueNumbers$ = numbers$.pipe(observeOn(queueScheduler));
По умолчанию Observables являются одноадресными, то есть каждая подписка создает отдельное выполнение Observable.
Однако вы можете передавать Observables по многоадресной рассылке, чтобы разделить одно выполнение между несколькими подписчиками, что может повысить производительность и уменьшить дублирование работы.
Пример в Angular с многоадресной передачей:
import { interval, Subject } from 'rxjs'; import { multicast, refCount } from 'rxjs/operators'; // Create a hot Observable that emits numbers every second const numbers$ = interval(1000).pipe(multicast(() => new Subject()), refCount()); // Subscribe to the hot Observable from multiple subscribers numbers$.subscribe(value => console.info(`Subscriber 1: ${value}`)); numbers$.subscribe(value => console.info(`Subscriber 2: ${value}`)); // Start the execution of the hot Observable numbers$.connect();
Вы также можете настроить поведение Observable, расширив класс Observable и реализовав собственную логику для выдачи значений, обработки ошибок и управления подписками.
Пример в Angular с пользовательским Observable:
import { Observable } from 'rxjs'; // Custom Observable that emits a sequence of numbers class MyNumbersObservable extends Observable<number> { private currentNumber = 1; constructor(private maxNumber: number) { super(subscriber => { const intervalId = setInterval(() => { if (this.currentNumber <= maxNumber) { subscriber.next(this.currentNumber++); } else { subscriber.complete(); clearInterval(intervalId); } }, 1000); }); } } // Create an instance of the custom Observable const myNumbers$ = new MyNumbersObservable(5); // Subscribe to the custom Observable myNumbers$.subscribe(value => console.info(value));
RxJS предоставляет механизмы для обработки обратного давления, которое возникает, когда скорость выброса данных из Observable выше, чем скорость потребления подписчиками. Стратегии обратного давления позволяют вам контролировать, как данные буферизируются, отбрасываются или управляются при работе с высокоскоростными потоками данных.
Пример в Angular с обратным давлением:
import { interval, bufferTime } from 'rxjs'; // Create a fast-emitting Observable that emits numbers every 100ms const fastNumbers$ = interval(100); // Use bufferTime operator to buffer emitted numbers for every 1 second const bufferedNumbers$ = fastNumbers$.pipe(bufferTime(1000)); // Subscribe to the buffered numbers bufferedNumbers$.subscribe(values => console.info(values));
RxJS предоставляет операторы для обработки ошибок, которые могут возникнуть в потоке Observable.
Вы можете перехватывать и обрабатывать ошибки, повторять попытки неудачных Observables и предпринимать другие действия для изящной обработки ошибок в вашем приложении.
Пример в Angular с обработкой ошибок:
import { of } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; // Create an Observable that may throw an error const numbers$ = of(1, 2, 3, 4, 5, 'invalid', 7, 8, 9); // Use catchError operator to catch and handle errors const safeNumbers$ = numbers$.pipe( catchError(error => { console.error(`Error: ${error}`); return of('Error occurred. Continuing with default value.'); }) ); // Use retry operator to retry failed Observables const retryNumbers$ = safeNumbers$.pipe( retry(2) // Retry failed Observables up to 2 times ); // Subscribe to the safe and retrying numbers retryNumbers$.subscribe(value => console.info(value));
RxJS позволяет создавать пользовательские операторы путем комбинирования существующих операторов или реализации собственной логики для преобразования или фильтрации значений в потоке Observable. Пользовательские операторы могут обеспечить многоразовую и специализированную функциональность для ваших конкретных случаев использования.
Пример в Angular с пользовательским оператором:
import { Observable, OperatorFunction } from 'rxjs'; import { filter } from 'rxjs/operators'; // Custom operator that filters out odd numbers function filterOutOddNumbers(): OperatorFunction<number, number> { return (source: Observable<number>) => new Observable<number>(subscriber => { return source.subscribe(value => { if (value % 2 === 0) { subscriber.next(value); } }); }); } // Create an Observable that emits a sequence of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use the custom filterOutOddNumbers operator const filteredNumbers$ = numbers$.pipe(filterOutOddNumbers()); // Subscribe to the filtered numbers filteredNumbers$.subscribe(value => console.info(value));
Это лишь краткий обзор некоторых концепций и возможностей RxJS от базовых до продвинутых. RxJS - это мощная и гибкая библиотека, которая может значительно упростить и улучшить ваше асинхронное программирование в Angular или любой другой среде JavaScript. Я рекомендую обратиться к официальной документации RxJS за более подробными объяснениями и примерами.
Надеюсь, эта подробная статья в блоге о Observables и предметах в Rxjs с примерами. Не забывайте грамотно применять эти концепции в своем коде и используйте приведенные примеры кода для лучшего понимания.
Счастливого кодирования!
Пожалуйста, дайте мне знать ваши мысли по поводу этой статьи, похлопав в ладоши или оставив комментарий с предложениями для будущих тем.
20.08.2023 18:21
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".
20.08.2023 17:46
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
19.08.2023 18:39
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.
19.08.2023 17:22
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!
18.08.2023 20:33
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.
14.08.2023 14:49
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.