Я использую функциональность stream.pipeline из Node для загрузки некоторых данных в S3. Основная идея, которую я реализую, заключается в извлечении файлов из запроса и их записи в S3. У меня есть один pipeline, который извлекает zip-файлы и успешно записывает их на S3. Однако я хочу, чтобы мой второй pipeline сделал тот же запрос, но разархивировал и записал разархивированные файлы на S3. Код конвейера выглядит следующим образом:
pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
Функция s3Stream выглядит так:
function s3Stream(file) {
const pass = new stream.PassThrough()
s3Store.upload(file, pass)
return pass
}
Первый pipeline работает хорошо и в настоящее время активно работает в производстве. Однако при добавлении второго конвейера я получаю следующую ошибку:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
Любая идея, что может быть причиной этого, или решения для решения этой проблемы, будет принята с благодарностью!





При использовании конвейера, который вы принимаете для полного использования читаемого потока, вы не хотите, чтобы что-либо останавливалось до окончания читаемого.
После некоторого времени работы с этими махинациями вот еще немного полезной информации.
import stream from 'stream'
const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()
s1.on('end', () => console.info('end 1'))
s2.on('end', () => console.info('end 2'))
s3.on('end', () => console.info('end 3'))
s1.on('close', () => console.info('close 1'))
s2.on('close', () => console.info('close 2'))
s3.on('close', () => console.info('close 3'))
stream.pipeline(
s1,
s2,
s3,
async s => { for await (_ of s) { } },
err => console.info('end', err)
)
теперь если я позвоню s2.end() он закроет всех родителей
end 2
close 2
end 3
close 3
pipeline is the equivalent of s3(s2(s1)))
но если я позвоню s2.destroy(), он напечатает и уничтожит все, это ваша проблема, здесь поток уничтожается до того, как он нормально завершится, либо ошибка, либо возврат/разрыв/выдает asyncGenerator/asyncFunction
close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
at PassThrough.emit (events.js:327:22)
at emitCloseNT (internal/streams/destroy.js:81:10)
at processTicksAndRejections (internal/process/task_queues.js:83:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3
Нельзя пускать ни один из потоков без возможности отлавливать их ошибки
stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
};