Освоение Observables и Subjects в Rxjs:

RedDeveloper
27.04.2023 08:14
Освоение Observables и Subjects в Rxjs:
Освоение Observables и Subjects в Rxjs

Давайте начнем с основ и постепенно перейдем к более продвинутым концепциям в RxJS в Angular

Наблюдаемые объекты и наблюдатели:

Observables - это основа RxJS, представляющая собой поток данных, которые можно наблюдать в течение времени. Наблюдатели являются потребителями этих потоков, прослушивая передаваемые данные и реагируя соответствующим образом.

Observables и Observers похожи на концепции событий и обработчиков событий в C#.

Наблюдаемые:

В Angular вы можете создать Observable, используя класс Observable из библиотеки rxjs. Observable может испускать данные асинхронно с течением времени и может быть подписан наблюдателями на получение испускаемых данных.

Пример в Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Subscribe to the Observable to receive the emitted numbers
numbers$.subscribe(value => console.info(value));

Наблюдатели:

В Angular наблюдатель - это объект, который определяет, как обрабатывать данные, полученные от Observable.

Он имеет три дополнительных метода:

Next для обработки выданных данных

Error для обработки любых ошибок

Complete для обработки завершения работы Observable.

Пример в Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Define an Observer to handle the emitted numbers
const observer = {
  next: value => console.info(value),
  error: err => console.error(err),
  complete: () => console.info('Observable completed')
};

// Subscribe the Observer to the Observable
numbers$.subscribe(observer);
  1. Операторы: Операторы - это функции, которые позволяют преобразовывать, фильтровать и объединять потоки данных, испускаемые Observables. Они похожи на операторы LINQ в C#.
  • Операторы преобразования: Операторы преобразования позволяют преобразовывать данные, выдаваемые Observable, в другой формат или структуру.
  • Пример в Angular с оператором map:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use map operator to square each number in the stream
const squaredNumbers$ = numbers$.pipe(
  map(num => num * num)
);

// Subscribe to the transformed data stream
squaredNumbers$.subscribe(value => console.info(value));
  • Операторы фильтрации: Операторы фильтрации позволяют фильтровать данные, выдаваемые наблюдаемым, на основе заданного условия.

Пример в Angular с оператором фильтрации:

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use filter operator to get only even numbers from the stream
const evenNumbers$ = numbers$.pipe(
  filter(num => num % 2 === 0)
);

// Subscribe to the filtered even numbers stream
evenNumbers$.subscribe(value => console.info(value));

Субъекты:

Субъекты - это одновременно Observables и Observers, что позволяет вам эмитировать и подписываться на потоки данных напрямую.

Субъекты похожи на концепцию эмиттеров событий в C#.

Пример в Angular с использованием Subject:

import { Subject } from 'rxjs';

// Create a Subject to emit and subscribe to a stream of numbers
const numbersSubject$ = new Subject<number>();

// Subscribe to the Subject to receive emitted numbers
numbersSubject$.subscribe(value => console.info(value));

// Emit numbers to the Subject
numbersSubject$.next(1);
numbersSubject$.next(2);
numbersSubject$.next(3);

Горячие и холодные Observables:

Наблюдаемые можно разделить на два типа

  1. Горячие
  2. Холодные.

Горячие обозреватели выдают данные независимо от наличия подписчиков Холодные обозреватели выдают данные только при наличии активных подписчиков.

Пример в Angular с Hot и Cold Observables:

import { interval, fromEvent } from 'rxjs';

// Hot Observable - emits data regardless of subscribers
const hotObservable$ = interval(1000);

// Cold Observable - emits data only when subscribed
const button = document.querySelector('button');
const coldObservable$ = fromEvent(button, 'click');

// Subscribe to both Observables
hotObservable$.subscribe(value => console.info(`Hot: ${value}`));
coldObservable$.subscribe(value => console.info(`Cold: ${value}`));

Обработка ошибок:

RxJS предоставляет операторы для обработки ошибок в Observables, такие как catchError и retry, которые позволяют обрабатывать ошибки и повторные попытки в потоке данных.

Пример в Angular с обработкой ошибок:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an observable that may emit an error
const numbers$ = of(1, 2, 3, 4, 5, 'six');

// Use catchError operator to handle errors
const numbersWithErrorHandled$ = numbers$.pipe(
  catchError(err => of('Error occurred:', err))
);

// Use retry operator to retry the observable in case of error
const numbersWithRetry$ = numbers$.pipe(
  retry(2) // Retry 2 times in case of error
);

// Subscribe to the error handled and retried observables
numbersWithErrorHandled$.subscribe(value => console.info(value));
numbersWithRetry$.subscribe(value => console.info(value));

Пользовательские операторы:

RxJS позволяет создавать свои собственные операторы путем компоновки существующих операторов или расширения класса Observable. Это дает вам гибкость в создании многократно используемых и специализированных операторов для ваших конкретных случаев использования.

Пример в Angular с пользовательским оператором:

import { Observable, OperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

// Custom operator to multiply each emitted number by a given factor
function multiplyBy(factor: number): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    source.pipe(map(num => num * factor));
}

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom multiplyBy operator to multiply each number by 10
const multipliedNumbers$ = numbers$.pipe(multiplyBy(10));

// Subscribe to the multiplied numbers
multipliedNumbers$.subscribe(value => console.info(value));

Schedulers:

RxJS позволяет управлять контекстом выполнения или планировщиком Observable. Планировщики предоставляют возможности для управления параллелизмом, контроля времени и выполнения кода в определенных потоках или контекстах.

Пример в Angular с планировщиками:

import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use observeOn operator to specify an asyncScheduler for subscription
const asyncNumbers$ = numbers$.pipe(observeOn(asyncScheduler));

// Subscribe to the numbers with asyncScheduler
asyncNumbers$.subscribe(value => console.info(value));

// Use observeOn operator with other schedulers like asapScheduler or queueScheduler
const asapNumbers$ = numbers$.pipe(observeOn(asapScheduler));
const queueNumbers$ = numbers$.pipe(observeOn(queueScheduler));

Multicasting:

По умолчанию Observables являются одноадресными, то есть каждая подписка создает отдельное выполнение Observable.

Однако вы можете передавать Observables по многоадресной рассылке, чтобы разделить одно выполнение между несколькими подписчиками, что может повысить производительность и уменьшить дублирование работы.

Пример в Angular с многоадресной передачей:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

// Create a hot Observable that emits numbers every second
const numbers$ = interval(1000).pipe(multicast(() => new Subject()), refCount());

// Subscribe to the hot Observable from multiple subscribers
numbers$.subscribe(value => console.info(`Subscriber 1: ${value}`));
numbers$.subscribe(value => console.info(`Subscriber 2: ${value}`));

// Start the execution of the hot Observable
numbers$.connect();

Настройка Observable:

Вы также можете настроить поведение Observable, расширив класс Observable и реализовав собственную логику для выдачи значений, обработки ошибок и управления подписками.

Пример в Angular с пользовательским Observable:

import { Observable } from 'rxjs';

// Custom Observable that emits a sequence of numbers
class MyNumbersObservable extends Observable<number> {
  private currentNumber = 1;

  constructor(private maxNumber: number) {
    super(subscriber => {
      const intervalId = setInterval(() => {
        if (this.currentNumber <= maxNumber) {
          subscriber.next(this.currentNumber++);
        } else {
          subscriber.complete();
          clearInterval(intervalId);
        }
      }, 1000);
    });
  }
}

// Create an instance of the custom Observable
const myNumbers$ = new MyNumbersObservable(5);

// Subscribe to the custom Observable
myNumbers$.subscribe(value => console.info(value));

Backpressure:

RxJS предоставляет механизмы для обработки обратного давления, которое возникает, когда скорость выброса данных из Observable выше, чем скорость потребления подписчиками. Стратегии обратного давления позволяют вам контролировать, как данные буферизируются, отбрасываются или управляются при работе с высокоскоростными потоками данных.

Пример в Angular с обратным давлением:

import { interval, bufferTime } from 'rxjs';

// Create a fast-emitting Observable that emits numbers every 100ms
const fastNumbers$ = interval(100);

// Use bufferTime operator to buffer emitted numbers for every 1 second
const bufferedNumbers$ = fastNumbers$.pipe(bufferTime(1000));

// Subscribe to the buffered numbers
bufferedNumbers$.subscribe(values => console.info(values));

Обработка ошибок:

RxJS предоставляет операторы для обработки ошибок, которые могут возникнуть в потоке Observable.

Вы можете перехватывать и обрабатывать ошибки, повторять попытки неудачных Observables и предпринимать другие действия для изящной обработки ошибок в вашем приложении.

Пример в Angular с обработкой ошибок:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an Observable that may throw an error
const numbers$ = of(1, 2, 3, 4, 5, 'invalid', 7, 8, 9);

// Use catchError operator to catch and handle errors
const safeNumbers$ = numbers$.pipe(
  catchError(error => {
    console.error(`Error: ${error}`);
    return of('Error occurred. Continuing with default value.');
  })
);

// Use retry operator to retry failed Observables
const retryNumbers$ = safeNumbers$.pipe(
  retry(2) // Retry failed Observables up to 2 times
);

// Subscribe to the safe and retrying numbers
retryNumbers$.subscribe(value => console.info(value));

Пользовательские операторы:

RxJS позволяет создавать пользовательские операторы путем комбинирования существующих операторов или реализации собственной логики для преобразования или фильтрации значений в потоке Observable. Пользовательские операторы могут обеспечить многоразовую и специализированную функциональность для ваших конкретных случаев использования.

Пример в Angular с пользовательским оператором:

import { Observable, OperatorFunction } from 'rxjs';
import { filter } from 'rxjs/operators';

// Custom operator that filters out odd numbers
function filterOutOddNumbers(): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    new Observable<number>(subscriber => {
      return source.subscribe(value => {
        if (value % 2 === 0) {
          subscriber.next(value);
        }
      });
    });
}

// Create an Observable that emits a sequence of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom filterOutOddNumbers operator
const filteredNumbers$ = numbers$.pipe(filterOutOddNumbers());

// Subscribe to the filtered numbers
filteredNumbers$.subscribe(value => console.info(value));

Это лишь краткий обзор некоторых концепций и возможностей RxJS от базовых до продвинутых. RxJS - это мощная и гибкая библиотека, которая может значительно упростить и улучшить ваше асинхронное программирование в Angular или любой другой среде JavaScript. Я рекомендую обратиться к официальной документации RxJS за более подробными объяснениями и примерами.

Надеюсь, эта подробная статья в блоге о Observables и предметах в Rxjs с примерами. Не забывайте грамотно применять эти концепции в своем коде и используйте приведенные примеры кода для лучшего понимания.

Счастливого кодирования!

Пожалуйста, дайте мне знать ваши мысли по поводу этой статьи, похлопав в ладоши или оставив комментарий с предложениями для будущих тем.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?

20.08.2023 18:21

Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".

Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией

20.08.2023 17:46

В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.

Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox

19.08.2023 18:39

Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.

Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest

19.08.2023 17:22

В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!

Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️

18.08.2023 20:33

Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL

14.08.2023 14:49

Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.