Реализация Stream в JavaScript

Я хочу реализовать объект потока, который может это сделать:

// a -------1------2----3
// map -----\------\----\
// b --------2------4----6

const a = new Stream();
const b = a.map(value => value * 2);

b.subscribe(console.info);

a.push(1);
// 2
a.push(2);
// 4
a.push(3);
// 6

Идея здесь в том, что объект b может подписывать новые обратные вызовы на поток a. Функция map должна прослушивать, когда вызывается push, и применять назначенную функцию, а также первоначально подписанную. Это реализация, которая у меня есть на данный момент:

class Stream {
  constructor(queue = []) {
    this.queue = queue;
  }

  subscribe(action) {
    if (typeof action === 'function') {
      this.queue.push(action);
    }
  }

  map(callback) {
     this.queue = this.queue.map(
        actionFn => arg => action(callback(arg))
     );

     return this;
  }

  push(value) {
    this.queue.forEach(actionFn => {
      actionFn.call(this, value);
    });
  }
}

Проблема с текущей реализацией заключается в том, что изначально queue в классе Stream пуст, поэтому он не проходит через него. Буду признателен за любые предложения или помощь. Я бы не хотел использовать для этого какую-либо библиотеку.

Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
2
0
422
2

Ответы 2

Ваша карта должна создать новый поток Transform и вернуть его. Вместо subscribe вы можете просто использовать стандартное событие on('data') или, лучше, использовать метод read.

Наконец, вы можете просто использовать мою работу и уже эффективно реализовать свой метод map с помощью scramjet, который делает именно то, что вы показали выше, и, кроме того, поддерживает асинхронные функции. :)

Вот как вы бы его использовали (в некоторых функциях getStream):

const {DataStream} = require('scramjet');

const stream = new DataStream();

stream.write(1); // you can also use await stream.whenWrote(1);
stream.write(2);
stream.write(3);

return stream.map(x => x * 2);

а затем прочтите это где-нибудь еще:

stream.on('data', x => console.info(`x: ${x}`));
// x: 2
// x: 4
// x: 6

Взгляните на Документы для ГПД здесь

После того, как я долго задавал этот вопрос, я смог вернуться к проблеме и найти простое решение. Поскольку один поток должен прослушивать тот, на который он подписан, мы должны вернуть экземпляр оригинала, чтобы сохранить значения из предыдущего потока. Вот код, который, как мне показалось, работает хорошо:

class Stream {
    constructor() {
        this.subscriptions = [];
        this.mappedActions = [];
    }

    subscribe(callback) {
        this.subscriptions.push(callback);
    }

    map(actionFunc) {
        this.mappedActions.push(actionFunc);

        return this;
    }

    push(opValue) {
        this.subscriptions.forEach(cb => {
            if (this.mappedActions.length) {
                this.mappedActions.forEach(action => {
                    cb(action.call(this, opValue));
                });
            } else {
                cb(opValue);
            }
        });
    }
}

const a = new Stream();
const b = a.map(value => value * 1 / 2);
const c = b.map(value => value * 3);

c.subscribe(console.info);

c.push(1);
c.push(2);
c.push(3);

// expected output in the console:
// 0.5
// 3
// 1
// 6
// 1.5
// 9

Надеюсь, что любой, кто наткнется на эту интересную проблему, найдет мое решение полезным. Если есть какие-либо изменения, которые вы хотели бы внести, не стесняйтесь делать это или свяжитесь со мной!

Другие вопросы по теме