У меня есть два асинхронных процесса, один из которых производит данные, а другой — потребляет данные. Они работают с разной скоростью, поэтому моя идея заключалась в том, чтобы использовать потоки Node.js для автоматической обработки противодавления между одним потоком и другим. Я попытался организовать решение, расширяющее классы stream.Readable
и stream.Writable
, используя другую задержку в реализациях _read
и _write
для имитации разной скорости в реальном мире (в моем случае чтение быстрее, чем запись) и асинхронный генератор в качестве данных источник. Оба потока работают в объектном режиме и имеют highWaterMark = 2
, поэтому я ожидал, что поток для чтения адаптируется к скорости потока для записи после заполнения буфера, но этого не происходит. Как показано в выводе, читаемый поток продолжает отправлять данные, даже если достигается highWaterMark
.
Что я делаю не так?
import { Readable, Writable } from 'stream';
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
async function* asyncGenerator() {
let i = 0;
while (i++ < 10) {
yield { data: i };
}
}
class TestReadable extends Readable {
public delay: number;
private _tag = '[Readable]';
private _generator = asyncGenerator();
constructor(delay: number) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
}
async _read(size: number) {
while (true) {
await sleep(this.delay);
const { value, done } = await this._generator.next();
const bufferFull = this.push(value);
console.info(this._tag, `Pushed ${JSON.stringify(value)}`, this.readableLength);
if (done) {
this.push(null);
break;
}
if (bufferFull) {
break;
}
}
}
}
class TestWritable extends Writable {
public delay: number;
private _tag = '[Writable]';
constructor(delay: number) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
}
async _write(chunk: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
await sleep(this.delay);
console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
callback();
}
}
(async() => {
const readable = new TestReadable(1000);
const writable = new TestWritable(3000);
readable.pipe(writable);
})();
[Readable] Pushed {"data":1} 0
[Readable] Pushed {"data":2} 0
[Readable] Pushed {"data":3} 1
[Writable] Received {"data":1}
[Readable] Pushed {"data":4} 2
[Readable] Pushed {"data":5} 3
[Readable] Pushed {"data":6} 4
[Writable] Received {"data":2}
[Readable] Pushed {"data":7} 3
[Readable] Pushed {"data":8} 4
[Readable] Pushed {"data":9} 5
[Writable] Received {"data":3}
[Readable] Pushed {"data":10} 6
[Readable] Pushed undefined 7
[Writable] Received {"data":4}
[Writable] Received {"data":5}
[Writable] Received {"data":6}
[Writable] Received {"data":7}
[Writable] Received {"data":8}
[Writable] Received {"data":9}
[Writable] Received {"data":10}
[Writable] Received undefined
Process finished with exit code 0
Я до сих пор не знаю, почему управление потоком не работает должным образом, но первый шаг — ваша переменная bufferFull
задом наперед. Должно быть const bufferFull = !this.push(value);
, потому что this.push(value)
возвращает false
, когда буфер заполнен. Одно это не решает проблему с читаемостью, превышающей highWaterMark
, поэтому здесь должно быть что-то еще.
Резюме
В реализации _read()
вы не можете выполнить эту последовательность событий без некоторых нежелательных последствий:
Asynchronous wait
stream.push(data)
Asynchronous wait
stream.push(data)
Как только вы возвращаете управление обратно в цикл событий после вызова stream.push(data)
, поток предполагает, что ваша операция _read()
выполнена, и немедленно вызовет ее снова, если буфер не заполнен. Вызывающий не может знать, что ваша первая _read()
операция все еще собирает больше данных и будет выполнять больше .push()
операций. Таков контракт/API для ._read()
.
Поэтому, как только вы вызовете stream.push()
и вернете управление обратно в цикл обработки событий, ваша обработка от этого вызова к _read()
должна быть выполнена.
Вы абсолютно можете сделать это в _read()
:
Asynchronous wait
stream.push(data)
Или даже это:
Asynchronous wait
stream.push(data)
stream.push(data)
stream.push(data)
Только не это (что вы и делали):
Asynchronous wait
stream.push(data)
Asynchronous wait
stream.push(data)
Длинное объяснение
Проблема вызвана пересечением того, как работают потоки, и тем, как написан ваш код. Подводя итог, проблема в том, что вы вызываете .push()
несколько раз после асинхронного ожидания каждого в одном и том же _read()
.
Это не то, как абонент _read()
ожидает, что все будет работать. Вам, безусловно, разрешено выполнять некоторые асинхронные операции, а затем вызывать .push()
один или несколько раз в одном и том же цикле событий. Но все становится запутанным, если вы вызываете .push()
, а затем возвращаете управление обратно в цикл событий, а затем, на каком-то более позднем такте цикла событий, снова вызываете .push()
из того же вызова _read()
. И это то, что делает ваш код.
Конструкция _read()
такова, что вы можете вызывать .push()
столько времени, сколько хотите, но как только вы вызываете .push()
и возвращаете управление обратно в цикл событий, поток видит, что в буфере потока есть место, и вы уже вызвал .push()
(так что вы должны сделать), поэтому он снова вызывает _read()
. Но ваша предыдущая _read()
еще не была сделана. Итак, теперь у вас есть несколько циклов _read()
, работающих одновременно, которые, на удивление, не взрываются, но в конечном итоге считывают слишком много данных и заставляют буфер расти больше, чем highWaterMark, потому что бухгалтерия вокруг этого предполагает, что один вызов _read()
вызвал .push()
и вернул управление обратно в цикл событий, это должно быть сделано. Но твое не было сделано. Нет другого способа сообщить, что ваш звонок _read()
еще не сделан.
Существует несколько различных типов исправлений.
Вы можете this.pause()
, пока ваше _read()
действительно не будет сделано, а затем this.resume()
. Это решает проблему, но может иметь некоторые другие побочные эффекты при обработке потока.
Вы можете просто читать по одному объекту за раз и, таким образом, по одному .push()
на _read()
. Как только вы вызовете .push()
и вернете управление обратно, поток просто снова вызовет _read()
.
Вы можете буферизовать объекты, которые вы собираете, а затем .push()
их все в поток в одном синхронном цикле.
# 2 было бы проще всего реализовать, и в вашей реализации, где вы все равно получаете только один объект за раз (всегда задержка между доступными объектами), у него не было бы никаких недостатков в производительности.
Это реализация ниже (в простом Javascript) варианта № 3, который использует локальную буферизацию, но все три варианта исправления работают.
import { Readable, Writable } from 'stream';
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
async function* asyncGenerator() {
let i = 0;
while (i++ < 10) {
yield { data: i };
}
}
class TestReadable extends Readable {
constructor(delay) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
this._tag = '[Readable]';
this._generator = asyncGenerator();
}
async _read(size) {
console.info(this._tag, `Starting read(${size})`);
let readCnt = size;
const buffer = [];
while (readCnt > 0) {
await sleep(this.delay);
const { value, done } = await this._generator.next();
if (done) {
this.push(null);
break;
}
buffer.push(value);
--readCnt;
}
for (let value of buffer) {
const bufferFull = !this.push(value);
console.info(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}, full = ${bufferFull}`);
}
console.info(this._tag, `Ending read()`);
}
}
class TestWritable extends Writable {
constructor(delay) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
this._tag = '[Writable]';
}
async _write(chunk, encoding, callback) {
await sleep(this.delay);
console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
callback();
}
}
(async () => {
const readable = new TestReadable(1000);
const writable = new TestWritable(3000);
readable.pipe(writable);
})();
Другие исправления/изменения в коде:
bufferFull
логика была обратной. this.push()
возвращает true
когда в буфере больше места, а не когда он заполнен. Поскольку мой измененный код обращает внимание на аргумент размера в _read(size)
, мой код больше не использует это значение.size
, переданный _read()
, и не читать и не отправлять больше этого.done
истинно в вызове await this._generator.next();
, value
нет, но вы все еще вводили это значение undefined
, а затем подталкивали null
. value
когда done
равно true
— это то, что генератор возвращает в конце. Ваш генератор не имеет окончательного возвращаемого значения и, следовательно, не имеет окончательного значения, когда done === true
.Вот реализация варианта №2 (чтение одного объекта за раз):
async _read(size) {
console.info(this._tag, `Starting read(${size})`);
await sleep(this.delay);
const { value, done } = await this._generator.next();
if (done) {
console.info(this._tag, 'End of read data');
this.push(null);
} else {
this.push(value);
console.info(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}`);
}
console.info(this._tag, `Ending read()`);
}
Вы можете подумать, что неэффективно просто отправлять один объект за вызов, но на самом деле это не так. Потому что, как только вы нажмете объект и в буфере появится больше места, поток немедленно запланирует проверку (для следующего тика цикла событий) для повторного вызова _read()
. Итак, ваш цикл while
внутри функции только что был заменен небольшой логикой прямо в вызывающей программе, которая просто снова вызовет _read()
. Не большая разница.
И эта реализация, безусловно, является самой простой из трех решений, которые я описал.
Привет! Большое спасибо, что нашли время, чтобы углубиться в это, я нашел точно такие же вещи. Действительно, я использовал переменную bufferFull
противоположным образом, а также размещение проверки done
было неправильным (что привело к отправке undefined в буфер). Даже с этими исправлениями буфер по-прежнему заполняется за пределы водяного знака, как вы очень четко описали. Я думаю, что избавление от цикла while внутри _read
— лучший вариант, Keep It Simple Stupid
в конце концов имеет свои достоинства ;-)
Вариант тестирования № 2: кажется, что в начале буфер все еще переполнен, но после этого противодавление работает как положено. Я могу жить с этим, просто задаваясь вопросом, почему это происходит (см. вывод в моем обновленном ответе).
@revy - я не вижу, где он когда-либо был переполнен. Наибольшее значение длины, которое показывает ваш вывод, равно 2, что является highWaterMark - точно так, как должно быть.
Я вижу, что он вначале заталкивает в буфер 3 объекта. Я думал, что highWaterMark = 2
означает максимум 2 объекта в буфере (до достижения порога).
@revy - Вы видите, что он отправляет 3 объекта, но после третьего нажатия readableLength
остается только 2. Это потому, что первый объект уже передан в поток записи - он просто еще не полностью обработал его для вызова _write()
( это, вероятно, происходит на следующем тике, когда вы сразу видите журнал записи). Он вызвал _read()
в третий раз, потому что буфер больше не был заполнен.
Как предложил jfriend00
, вот модифицированная версия без использования цикла while внутри _read
. В начале все еще происходит переполнение читаемого буфера (не знаю почему), но после этого кажется, что противодавление обрабатывается правильно:
import { Readable, Writable } from 'stream';
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
async function* asyncGenerator() {
let i = 0;
while (i++ < 10) {
yield { data: i };
}
}
class TestReadable extends Readable {
public delay: number;
private _tag = '[Readable]';
private _generator = asyncGenerator();
constructor(delay: number) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
}
async _read(size: number) {
await sleep(this.delay);
const { value, done } = await this._generator.next();
if (done) {
return this.push(null);
}
const bufferFull = !this.push(value);
console.info(this._tag, `Pushed ${JSON.stringify(value)}`, this.readableLength, bufferFull);
}
}
class TestWritable extends Writable {
public delay: number;
private _tag = '[Writable]';
constructor(delay: number) {
super({
objectMode: true,
highWaterMark: 2,
});
this.delay = delay;
}
async _write(chunk: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
await sleep(this.delay);
console.info(this._tag, `Received ${JSON.stringify(chunk)}`);
callback();
}
}
(async() => {
const readable = new TestReadable(1000);
const writable = new TestWritable(3000);
readable.pipe(writable);
})();
[Readable] Pushed {"data":1} 0 false
[Readable] Pushed {"data":2} 0 false
[Readable] Pushed {"data":3} 1 false
[Writable] Received {"data":1}
[Readable] Pushed {"data":4} 2 true
[Writable] Received {"data":2}
[Readable] Pushed {"data":5} 1 false
[Readable] Pushed {"data":6} 2 true
[Writable] Received {"data":3}
[Writable] Received {"data":4}
[Readable] Pushed {"data":7} 1 false
[Readable] Pushed {"data":8} 2 true
[Writable] Received {"data":5}
[Writable] Received {"data":6}
[Readable] Pushed {"data":9} 1 false
[Readable] Pushed {"data":10} 2 true
[Writable] Received {"data":7}
[Writable] Received {"data":8}
[Writable] Received {"data":9}
[Writable] Received {"data":10}
Process finished with exit code 0
Я бы посоветовал вам загрузить отладчик и проследить, что происходит, когда вы вызываете
this.push()
, чтобы вы могли точно увидеть, какую логику он применяет, что приводит к превышению максимальной отметки. Код потоков, который вызывается при вызовеthis.push()
, представляет собой этуreadAddChunk()
функцию. Код для этого здесь. Поскольку в этом коде много блоковif
, трудно просто прочитать код и увидеть, по какому пути он идет и почему. Легче просто проследить в отладчике.