У меня есть следующая проблема:
Я использую потоки NodeJS с конвейером для чтения большого файла, выполнения некоторых преобразований, а затем записи его в доступный для записи поток. Сложность в том, что я хотел иметь возможность остановить чтение файла, если выполняется определенное условие, но при этом закончить обработку уже прочитанных фрагментов.
В следующем примере я читаю текстовый файл, мой поток преобразования (secondStream) преобразует текст в верхний регистр и отправляет в следующий поток. Но если он находит текст, это означает, что он должен прекратить чтение из текстового файла, что, как я полагаю, означает, что поток чтения должен прекратить чтение фрагментов.
Я пробовал несколько решений, но не буду говорить, что я здесь немного запутался. До сих пор я получил следующий код для работы. Однако при использовании firstStream.destroy();
конвейер выдает ошибку.
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
Я смог «избежать» этой ошибки, перехватив и проигнорировав ее в конвейере, но, честно говоря, это не кажется мне безопасным или правильным.
const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");
let shouldStop = false;
const firstStream = fs.createReadStream("./lg.txt");
const secondStream = new Transform({
transform(chunk, encoding, callback) {
const foundText = chunk.toString().search("CHAPTER 9") !== -1;
if (foundText) {
shouldStop = true;
}
const transformed = chunk.toString().toUpperCase();
callback(null, transformed);
},
});
const lastStream = process.stdout;
firstStream.on("data", () => {
if (shouldStop) {
console.info("should pause");
firstStream.destroy();
}
});
await pipeline(firstStream, secondStream, lastStream).catch(
(err) => undefined
); // Feels wrong to me
Есть ли лучший способ сделать это? Я что-то пропустил?
Заранее спасибо друзья!
К вашему сведению, ваш поток преобразования может просто съесть оставшуюся часть потока после того, как вы достигнете точки остановки, поэтому ничего после этого не передается по линии.
@jfriend00 Привет, спасибо за помощь. Есть ли способ сделать это? Я слышал, что pipe
может вызвать утечку памяти, может быть, используя чистые события?
В вашем потоке преобразования вы можете просто «съесть» или «пропустить» любые данные после того, как вы нашли целевой текст. Таким образом, вы можете сохранить всю остальную pipeline()
логику. Вместо немедленного завершения он просто прочитает до конца входного потока, но пропустит все данные после целевого текста. Это позволяет потокам нормально завершаться.
const secondStream = new Transform({
transform(chunk, encoding, callback) {
if (shouldStop) {
// eat any remaining data
callback(null, "");
} else {
const text = chunk.toString();
const foundText = text.search("CHAPTER 9") !== -1;
if (foundText) {
// set flag to eat remaining data
shouldStop = true;
}
callback(null, text.toUpperCase());
}
},
});
Функция pipeline()
также поддерживает контроллер прерывания, который является поддерживаемым средством прерывания конвейера, при этом все еще очищая все надлежащим образом. Когда вы прерываете, pipeline()
заканчивается отклоненным обещанием, но вы можете проверить, был ли отказ вызван вашим прерыванием или нет, и если да, вы можете получить сообщение о прерывании.
В вашем коде это можно реализовать так:
const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");
const firstStream = fs.createReadStream("./lg.txt");
const ac = new AbortController();
const signal = ac.signal;
const secondStream = new Transform({
transform(chunk, encoding, callback) {
const text = chunk.toString();
const foundText = text.search("CHAPTER 9") !== -1;
callback(null, text.toUpperCase());
if (foundText) {
ac.abort(new Error("reading terminated, match found"));
}
},
});
const lastStream = process.stdout;
pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
console.info("\nall done without match");
}).catch((err) => {
if (err.code === "ABORT_ERR") {
console.info(`\n${signal.reason.message}`);
} else {
console.info(err);
}
});
Примечание. В другой теме ваш код уязвим для строки поиска, попадающей за границу фрагмента и, следовательно, не обнаруживаемой. Обычный способ избежать этой проблемы — сохранить последние N символов каждого фрагмента и добавить его к следующему фрагменту перед запуском поиска соответствия, где N — длина строки поиска — 1. Это гарантирует, что вы не пропустите поиск. строка, которая охватывает фрагменты. Вам нужно будет настроить свой вывод, чтобы он также не включал предварительно добавленный текст. Поскольку это не было сутью вашего вопроса здесь, я не добавлял эту логику и оставлю это вам, но это необходимо для надежного сопоставления.
Спасибо за усилия, единственная проблема для меня в том, что он все еще продолжает читать поток, он просто игнорирует фрагмент. Я хотел сократить время и смоделировать конец потока чтения
@MattV - я добавил в свой ответ еще один метод, который позволяет вам прервать pipeline()
, когда вы нашли совпадающий текст. Это останавливает дальнейшее чтение файла и немедленно завершает операцию pipeline()
. Он использует AbortController
, который является поддерживаемой функцией функции pipeline()
.
Спасибо, я верю, что это очень поможет. И спасибо за дополнительную заботу о куске
Если вам нужен лучший контроль, возможно, вам не следует использовать конвейер, а взять на себя этот процесс вручную (чтение из потока и запись в другой поток).
pipeline()
предназначен для ситуации, когда вы хотите передать весь поток.