Противодействие асинхронным потокам Node.js не работает

У меня есть два асинхронных процесса, один из которых производит данные, а другой — потребляет данные. Они работают с разной скоростью, поэтому моя идея заключалась в том, чтобы использовать потоки Node.js для автоматической обработки противодавления между одним потоком и другим. Я попытался организовать решение, расширяющее классы stream.Readable и stream.Writable, используя другую задержку в реализациях _read и _write для имитации разной скорости в реальном мире (в моем случае чтение быстрее, чем запись) и асинхронный генератор в качестве данных источник. Оба потока работают в объектном режиме и имеют highWaterMark = 2, поэтому я ожидал, что поток для чтения адаптируется к скорости потока для записи после заполнения буфера, но этого не происходит. Как показано в выводе, читаемый поток продолжает отправлять данные, даже если достигается highWaterMark. Что я делаю не так?

import { Readable, Writable } from 'stream';

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

async function* asyncGenerator() {
    let i = 0;

    while (i++ < 10) {
        yield { data: i };
    }
}

class TestReadable extends Readable {
    public delay: number;
    private _tag = '[Readable]';
    private _generator = asyncGenerator();

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _read(size: number) {
        while (true) {
            await sleep(this.delay);
            const { value, done } = await this._generator.next();
            const bufferFull = this.push(value);
            console.info(this._tag, `Pushed ${JSON.stringify(value)}`, this.readableLength);

            if (done) {
                this.push(null);
                break;
            }

            if (bufferFull) {
                break;
            }
        }
    }
}

class TestWritable extends Writable {
    public delay: number;
    private _tag = '[Writable]';

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _write(chunk: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
        await sleep(this.delay);
        console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
        callback();
    }
}

(async() => {
    const readable = new TestReadable(1000);
    const writable = new TestWritable(3000);

    readable.pipe(writable);
})();
[Readable] Pushed {"data":1} 0
[Readable] Pushed {"data":2} 0
[Readable] Pushed {"data":3} 1
[Writable] Received {"data":1}
[Readable] Pushed {"data":4} 2
[Readable] Pushed {"data":5} 3
[Readable] Pushed {"data":6} 4
[Writable] Received {"data":2}
[Readable] Pushed {"data":7} 3
[Readable] Pushed {"data":8} 4
[Readable] Pushed {"data":9} 5
[Writable] Received {"data":3}
[Readable] Pushed {"data":10} 6
[Readable] Pushed undefined 7
[Writable] Received {"data":4}
[Writable] Received {"data":5}
[Writable] Received {"data":6}
[Writable] Received {"data":7}
[Writable] Received {"data":8}
[Writable] Received {"data":9}
[Writable] Received {"data":10}
[Writable] Received undefined

Process finished with exit code 0

Я бы посоветовал вам загрузить отладчик и проследить, что происходит, когда вы вызываете this.push(), чтобы вы могли точно увидеть, какую логику он применяет, что приводит к превышению максимальной отметки. Код потоков, который вызывается при вызове this.push(), представляет собой эту readAddChunk() функцию. Код для этого здесь. Поскольку в этом коде много блоков if, трудно просто прочитать код и увидеть, по какому пути он идет и почему. Легче просто проследить в отладчике.

jfriend00 05.10.2022 19:31

Я до сих пор не знаю, почему управление потоком не работает должным образом, но первый шаг — ваша переменная bufferFull задом наперед. Должно быть const bufferFull = !this.push(value);, потому что this.push(value) возвращает false, когда буфер заполнен. Одно это не решает проблему с читаемостью, превышающей highWaterMark, поэтому здесь должно быть что-то еще.

jfriend00 05.10.2022 21:26
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
0
2
164
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Резюме

В реализации _read() вы не можете выполнить эту последовательность событий без некоторых нежелательных последствий:

Asynchronous wait
stream.push(data)
Asynchronous wait
stream.push(data)

Как только вы возвращаете управление обратно в цикл событий после вызова stream.push(data), поток предполагает, что ваша операция _read() выполнена, и немедленно вызовет ее снова, если буфер не заполнен. Вызывающий не может знать, что ваша первая _read() операция все еще собирает больше данных и будет выполнять больше .push() операций. Таков контракт/API для ._read().

Поэтому, как только вы вызовете stream.push() и вернете управление обратно в цикл обработки событий, ваша обработка от этого вызова к _read() должна быть выполнена.

Вы абсолютно можете сделать это в _read():

Asynchronous wait
stream.push(data)

Или даже это:

Asynchronous wait
stream.push(data)
stream.push(data)
stream.push(data)

Только не это (что вы и делали):

Asynchronous wait
stream.push(data)
Asynchronous wait
stream.push(data)

Длинное объяснение

Проблема вызвана пересечением того, как работают потоки, и тем, как написан ваш код. Подводя итог, проблема в том, что вы вызываете .push() несколько раз после асинхронного ожидания каждого в одном и том же _read().

Это не то, как абонент _read() ожидает, что все будет работать. Вам, безусловно, разрешено выполнять некоторые асинхронные операции, а затем вызывать .push() один или несколько раз в одном и том же цикле событий. Но все становится запутанным, если вы вызываете .push(), а затем возвращаете управление обратно в цикл событий, а затем, на каком-то более позднем такте цикла событий, снова вызываете .push() из того же вызова _read(). И это то, что делает ваш код.

Конструкция _read() такова, что вы можете вызывать .push() столько времени, сколько хотите, но как только вы вызываете .push() и возвращаете управление обратно в цикл событий, поток видит, что в буфере потока есть место, и вы уже вызвал .push() (так что вы должны сделать), поэтому он снова вызывает _read(). Но ваша предыдущая _read() еще не была сделана. Итак, теперь у вас есть несколько циклов _read(), работающих одновременно, которые, на удивление, не взрываются, но в конечном итоге считывают слишком много данных и заставляют буфер расти больше, чем highWaterMark, потому что бухгалтерия вокруг этого предполагает, что один вызов _read() вызвал .push() и вернул управление обратно в цикл событий, это должно быть сделано. Но твое не было сделано. Нет другого способа сообщить, что ваш звонок _read() еще не сделан.

Существует несколько различных типов исправлений.

  1. Вы можете this.pause(), пока ваше _read() действительно не будет сделано, а затем this.resume(). Это решает проблему, но может иметь некоторые другие побочные эффекты при обработке потока.

  2. Вы можете просто читать по одному объекту за раз и, таким образом, по одному .push() на _read(). Как только вы вызовете .push() и вернете управление обратно, поток просто снова вызовет _read().

  3. Вы можете буферизовать объекты, которые вы собираете, а затем .push() их все в поток в одном синхронном цикле.

# 2 было бы проще всего реализовать, и в вашей реализации, где вы все равно получаете только один объект за раз (всегда задержка между доступными объектами), у него не было бы никаких недостатков в производительности.

Это реализация ниже (в простом Javascript) варианта № 3, который использует локальную буферизацию, но все три варианта исправления работают.

import { Readable, Writable } from 'stream';

const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));

async function* asyncGenerator() {
    let i = 0;

    while (i++ < 10) {
        yield { data: i };
    }
}

class TestReadable extends Readable {

    constructor(delay) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
        this._tag = '[Readable]';
        this._generator = asyncGenerator();
    }

    async _read(size) {
        console.info(this._tag, `Starting read(${size})`);
        let readCnt = size;
        const buffer = [];
        while (readCnt > 0) {
            await sleep(this.delay);
            const { value, done } = await this._generator.next();
            if (done) {
                this.push(null);
                break;
            }

            buffer.push(value);
            --readCnt;
        }
        for (let value of buffer) {
            const bufferFull = !this.push(value);
            console.info(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}, full = ${bufferFull}`);
        }
        console.info(this._tag, `Ending read()`);
    }
}

class TestWritable extends Writable {

    constructor(delay) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
        this._tag = '[Writable]';
    }

    async _write(chunk, encoding, callback) {
        await sleep(this.delay);
        console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
        callback();
    }
}

(async () => {
    const readable = new TestReadable(1000);
    const writable = new TestWritable(3000);

    readable.pipe(writable);
})();

Другие исправления/изменения в коде:

  1. Ваша bufferFull логика была обратной. this.push() возвращает true когда в буфере больше места, а не когда он заполнен. Поскольку мой измененный код обращает внимание на аргумент размера в _read(size), мой код больше не использует это значение.
  2. Я реализовал элементы управления, чтобы учитывать аргумент size, переданный _read(), и не читать и не отправлять больше этого.
  3. Когда done истинно в вызове await this._generator.next();, value нет, но вы все еще вводили это значение undefined, а затем подталкивали null. value когда done равно true — это то, что генератор возвращает в конце. Ваш генератор не имеет окончательного возвращаемого значения и, следовательно, не имеет окончательного значения, когда done === true.

Вот реализация варианта №2 (чтение одного объекта за раз):

async _read(size) {
    console.info(this._tag, `Starting read(${size})`);
    await sleep(this.delay);
    const { value, done } = await this._generator.next();
    if (done) {
        console.info(this._tag, 'End of read data');
        this.push(null);
    } else {
        this.push(value);
        console.info(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}`);
    }
    console.info(this._tag, `Ending read()`);
}

Вы можете подумать, что неэффективно просто отправлять один объект за вызов, но на самом деле это не так. Потому что, как только вы нажмете объект и в буфере появится больше места, поток немедленно запланирует проверку (для следующего тика цикла событий) для повторного вызова _read(). Итак, ваш цикл while внутри функции только что был заменен небольшой логикой прямо в вызывающей программе, которая просто снова вызовет _read(). Не большая разница.

И эта реализация, безусловно, является самой простой из трех решений, которые я описал.

Привет! Большое спасибо, что нашли время, чтобы углубиться в это, я нашел точно такие же вещи. Действительно, я использовал переменную bufferFull противоположным образом, а также размещение проверки done было неправильным (что привело к отправке undefined в буфер). Даже с этими исправлениями буфер по-прежнему заполняется за пределы водяного знака, как вы очень четко описали. Я думаю, что избавление от цикла while внутри _read — лучший вариант, Keep It Simple Stupid в конце концов имеет свои достоинства ;-)

revy 06.10.2022 08:52

Вариант тестирования № 2: кажется, что в начале буфер все еще переполнен, но после этого противодавление работает как положено. Я могу жить с этим, просто задаваясь вопросом, почему это происходит (см. вывод в моем обновленном ответе).

revy 06.10.2022 09:07

@revy - я не вижу, где он когда-либо был переполнен. Наибольшее значение длины, которое показывает ваш вывод, равно 2, что является highWaterMark - точно так, как должно быть.

jfriend00 06.10.2022 09:40

Я вижу, что он вначале заталкивает в буфер 3 объекта. Я думал, что highWaterMark = 2 означает максимум 2 объекта в буфере (до достижения порога).

revy 06.10.2022 09:57

@revy - Вы видите, что он отправляет 3 объекта, но после третьего нажатия readableLength остается только 2. Это потому, что первый объект уже передан в поток записи - он просто еще не полностью обработал его для вызова _write() ( это, вероятно, происходит на следующем тике, когда вы сразу видите журнал записи). Он вызвал _read() в третий раз, потому что буфер больше не был заполнен.

jfriend00 06.10.2022 10:26

Как предложил jfriend00, вот модифицированная версия без использования цикла while внутри _read. В начале все еще происходит переполнение читаемого буфера (не знаю почему), но после этого кажется, что противодавление обрабатывается правильно:

import { Readable, Writable } from 'stream';

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

async function* asyncGenerator() {
    let i = 0;

    while (i++ < 10) {
        yield { data: i };
    }
}

class TestReadable extends Readable {
    public delay: number;
    private _tag = '[Readable]';
    private _generator = asyncGenerator();

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _read(size: number) {
        await sleep(this.delay);
        const { value, done } = await this._generator.next();

        if (done) {
            return this.push(null);
        }

        const bufferFull = !this.push(value);
        console.info(this._tag, `Pushed ${JSON.stringify(value)}`, this.readableLength, bufferFull);
    }
}

class TestWritable extends Writable {
    public delay: number;
    private _tag = '[Writable]';

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _write(chunk: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
        await sleep(this.delay);
        console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
        callback();
    }
}

(async() => {
    const readable = new TestReadable(1000);
    const writable = new TestWritable(3000);

    readable.pipe(writable);
})();
[Readable] Pushed {"data":1} 0 false
[Readable] Pushed {"data":2} 0 false
[Readable] Pushed {"data":3} 1 false
[Writable] Received {"data":1}
[Readable] Pushed {"data":4} 2 true
[Writable] Received {"data":2}
[Readable] Pushed {"data":5} 1 false
[Readable] Pushed {"data":6} 2 true
[Writable] Received {"data":3}
[Writable] Received {"data":4}
[Readable] Pushed {"data":7} 1 false
[Readable] Pushed {"data":8} 2 true
[Writable] Received {"data":5}
[Writable] Received {"data":6}
[Readable] Pushed {"data":9} 1 false
[Readable] Pushed {"data":10} 2 true
[Writable] Received {"data":7}
[Writable] Received {"data":8}
[Writable] Received {"data":9}
[Writable] Received {"data":10}

Process finished with exit code 0

Другие вопросы по теме