Ошибка [ERR_STREAM_PREMATURE_CLOSE]: преждевременное закрытие в потоке Node Pipeline

Я использую функциональность stream.pipeline из Node для загрузки некоторых данных в S3. Основная идея, которую я реализую, заключается в извлечении файлов из запроса и их записи в S3. У меня есть один pipeline, который извлекает zip-файлы и успешно записывает их на S3. Однако я хочу, чтобы мой второй pipeline сделал тот же запрос, но разархивировал и записал разархивированные файлы на S3. Код конвейера выглядит следующим образом:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

Функция s3Stream выглядит так:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

Первый pipeline работает хорошо и в настоящее время активно работает в производстве. Однако при добавлении второго конвейера я получаю следующую ошибку:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)

Любая идея, что может быть причиной этого, или решения для решения этой проблемы, будет принята с благодарностью!

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
9
0
6 135
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

TL;DR

При использовании конвейера, который вы принимаете для полного использования читаемого потока, вы не хотите, чтобы что-либо останавливалось до окончания читаемого.

Глубокое погружение

После некоторого времени работы с этими махинациями вот еще немного полезной информации.

import stream from 'stream'

const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()

s1.on('end', () => console.info('end 1'))
s2.on('end', () => console.info('end 2'))
s3.on('end', () => console.info('end 3'))
s1.on('close', () => console.info('close 1'))
s2.on('close', () => console.info('close 2'))
s3.on('close', () => console.info('close 3'))

stream.pipeline(
    s1,
    s2,
    s3,
    async s => { for await (_ of s) { } },
    err => console.info('end', err)
)

теперь если я позвоню s2.end() он закроет всех родителей

end 2
close 2
end 3
close 3

pipeline is the equivalent of s3(s2(s1)))

но если я позвоню s2.destroy(), он напечатает и уничтожит все, это ваша проблема, здесь поток уничтожается до того, как он нормально завершится, либо ошибка, либо возврат/разрыв/выдает asyncGenerator/asyncFunction

close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
    at PassThrough.emit (events.js:327:22)
    at emitCloseNT (internal/streams/destroy.js:81:10)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3

Нельзя пускать ни один из потоков без возможности отлавливать их ошибки

stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

источник узла (14.4)

  const onclose = () => {
    if (readable && !readableEnded) {
      if (!isReadableEnded(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    if (writable && !writableFinished) {
      if (!isWritableFinished(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    callback.call(stream);
  };

Другие вопросы по теме