NodeJS Streams — завершает поток чтения в конвейере, но продолжает обрабатывать уже прочитанные фрагменты

У меня есть следующая проблема:

Я использую потоки NodeJS с конвейером для чтения большого файла, выполнения некоторых преобразований, а затем записи его в доступный для записи поток. Сложность в том, что я хотел иметь возможность остановить чтение файла, если выполняется определенное условие, но при этом закончить обработку уже прочитанных фрагментов.

В следующем примере я читаю текстовый файл, мой поток преобразования (secondStream) преобразует текст в верхний регистр и отправляет в следующий поток. Но если он находит текст, это означает, что он должен прекратить чтение из текстового файла, что, как я полагаю, означает, что поток чтения должен прекратить чтение фрагментов.

Я пробовал несколько решений, но не буду говорить, что я здесь немного запутался. До сих пор я получил следующий код для работы. Однако при использовании firstStream.destroy(); конвейер выдает ошибку.

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close

Я смог «избежать» этой ошибки, перехватив и проигнорировав ее в конвейере, но, честно говоря, это не кажется мне безопасным или правильным.

  const { Transform, Writable, Readable } = require("node:stream");
  const { pipeline } = require("node:stream/promises");
  const fs = require("node:fs");

  let shouldStop = false;
  const firstStream = fs.createReadStream("./lg.txt");

  const secondStream = new Transform({
    transform(chunk, encoding, callback) {
      const foundText = chunk.toString().search("CHAPTER 9") !== -1;

      if (foundText) {
        shouldStop = true;
      }

      const transformed = chunk.toString().toUpperCase();
      callback(null, transformed);
    },
  });

  const lastStream = process.stdout;

  firstStream.on("data", () => {
    if (shouldStop) {
      console.info("should pause");
      firstStream.destroy();
    }
  });

  await pipeline(firstStream, secondStream, lastStream).catch(
    (err) => undefined
  ); // Feels wrong to me

Есть ли лучший способ сделать это? Я что-то пропустил?

Заранее спасибо друзья!

Если вам нужен лучший контроль, возможно, вам не следует использовать конвейер, а взять на себя этот процесс вручную (чтение из потока и запись в другой поток). pipeline() предназначен для ситуации, когда вы хотите передать весь поток.

jfriend00 15.05.2023 03:47

К вашему сведению, ваш поток преобразования может просто съесть оставшуюся часть потока после того, как вы достигнете точки остановки, поэтому ничего после этого не передается по линии.

jfriend00 15.05.2023 03:48

@jfriend00 Привет, спасибо за помощь. Есть ли способ сделать это? Я слышал, что pipe может вызвать утечку памяти, может быть, используя чистые события?

MattV 15.05.2023 15:52
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
1
3
76
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В вашем потоке преобразования вы можете просто «съесть» или «пропустить» любые данные после того, как вы нашли целевой текст. Таким образом, вы можете сохранить всю остальную pipeline() логику. Вместо немедленного завершения он просто прочитает до конца входного потока, но пропустит все данные после целевого текста. Это позволяет потокам нормально завершаться.

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        if (shouldStop) {
            // eat any remaining data
            callback(null, "");
        } else {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
            if (foundText) {
                // set flag to eat remaining data
                shouldStop = true;
            }
            callback(null, text.toUpperCase());
        }
    },
});

Функция pipeline() также поддерживает контроллер прерывания, который является поддерживаемым средством прерывания конвейера, при этом все еще очищая все надлежащим образом. Когда вы прерываете, pipeline() заканчивается отклоненным обещанием, но вы можете проверить, был ли отказ вызван вашим прерыванием или нет, и если да, вы можете получить сообщение о прерывании.

В вашем коде это можно реализовать так:

const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

const firstStream = fs.createReadStream("./lg.txt");

const ac = new AbortController();
const signal = ac.signal;

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        const text = chunk.toString();
        const foundText = text.search("CHAPTER 9") !== -1;

        callback(null, text.toUpperCase());
        if (foundText) {
            ac.abort(new Error("reading terminated, match found"));
        }

    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
    console.info("\nall done without match");
}).catch((err) => {
    if (err.code === "ABORT_ERR") {
        console.info(`\n${signal.reason.message}`);
    } else {
        console.info(err);
    }
});

Примечание. В другой теме ваш код уязвим для строки поиска, попадающей за границу фрагмента и, следовательно, не обнаруживаемой. Обычный способ избежать этой проблемы — сохранить последние N символов каждого фрагмента и добавить его к следующему фрагменту перед запуском поиска соответствия, где N — длина строки поиска — 1. Это гарантирует, что вы не пропустите поиск. строка, которая охватывает фрагменты. Вам нужно будет настроить свой вывод, чтобы он также не включал предварительно добавленный текст. Поскольку это не было сутью вашего вопроса здесь, я не добавлял эту логику и оставлю это вам, но это необходимо для надежного сопоставления.

Спасибо за усилия, единственная проблема для меня в том, что он все еще продолжает читать поток, он просто игнорирует фрагмент. Я хотел сократить время и смоделировать конец потока чтения

MattV 16.05.2023 19:19

@MattV - я добавил в свой ответ еще один метод, который позволяет вам прервать pipeline(), когда вы нашли совпадающий текст. Это останавливает дальнейшее чтение файла и немедленно завершает операцию pipeline(). Он использует AbortController, который является поддерживаемой функцией функции pipeline().

jfriend00 16.05.2023 23:56

Спасибо, я верю, что это очень поможет. И спасибо за дополнительную заботу о куске

MattV 17.05.2023 00:19

Другие вопросы по теме