Противодавление не сбрасывается при потоковой передаче на S3 с узлом

Я использую библиотеку aws-sdk-js-v3 для потоковой передачи нескольких наборов данных на один и тот же объект в s3 с использованием одного и того же потока.

import dotenv from 'dotenv'
import {S3} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage';
import {PassThrough} from 'stream';
import {randomBytes} from 'crypto'
import { env } from 'process';

export async function finishS3Stream(upload,stream){
  stream.end();
  await upload.done();
}

export async function writeToStream(stream, data){
  // If the stream returns false it represents that the amount of data in the stream has passed the
  // highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
  return new Promise((resolve) => {
    if (!stream.write(data)) {
      console.info("drain needed")
      stream.once('drain', resolve);
    } else {
      resolve();
    }
  });
}

export function createS3Stream(key,bucket) {
  const client = new S3()

  const stream = new PassThrough();

  stream.on('error', (e) => console.info(e))

  const upload = new Upload({
    params: {
      Bucket: bucket,
      Key: key,
      Body: stream,
    },
    client,
  });

  return {
    stream,
    upload,
  };
}

async function main(){
  dotenv.config()
  const bucket = env.BUCKET
  const key = env.KEY

  console.info("creating stream")
  const {stream,upload} = createS3Stream(key,bucket)

  const data1 = randomBytes(5242880)
  console.info('writing data1 to stream')
  await writeToStream(stream,data1)

  const data2 = randomBytes(5242880)
  console.info('writing data2 to stream')
  await writeToStream(stream,data2)

  console.info('closing stream')
  await finishS3Stream(upload,stream)
}

main()

По какой-то причине основная функция не ожидает, пока поток будет слит, как только он достигнет верхней отметки, и приложение выйдет с кодом выхода 0.

Выход

creating stream
writing data1 to stream
drain needed

Как я могу заставить приложение ждать, пока поток будет слит, и почему оно не ждет, пока обещание будет разрешено, и почему оно завершается перед записью следующего набора данных?

Попробуйте переместить stream.once('drain', resolve) выше оператора if.

Svenskunganka 07.02.2023 02:52

@Svenskunganka не повезло, та же проблема

BigL 07.02.2023 02:54
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
65
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

он не сливается, потому что вы ожидаете метод writeToStream (он ждет слива), и стим не сливается, пока вы не вызовете finishS3Stream, который вызовет метод done или класс Upload (он только начинает читать стим там )?

Ответ принят как подходящий

Для тех, кто изо всех сил пытается использовать документацию aws для записи нескольких файлов или нескольких наборов данных в один и тот же поток, вот решение. Вам нужно асинхронно вызвать загрузку, чтобы s3Client начал читать поток. Затем, когда вы его закроете, вам нужно дождаться разрешения функции загрузки.

export async function finishS3Stream(upload,stream){
  stream.end();
  // wait for the upload promise to resolve
  await upload;
}

export async function writeToStream(stream, data){
  // If the stream returns false it represents that the amount of data in the stream has passed the
  // highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
  return new Promise((resolve) => {
    if (!stream.write(data)) {
      console.info("drain needed")
      stream.once('drain', resolve);
    } else {
      resolve();
    }
  });
}

export function createS3Stream(key,bucket) {
  const client = new S3()

  const stream = new PassThrough();

  stream.on('error', (e) => console.info(e))

  // Call done to start reading the string and return a Promise.
  const upload = new Upload({
    params: {
      Bucket: bucket,
      Key: key,
      Body: stream,
    },
    client,
  }).done();

  return {
    stream,
    upload,
  };
}

async function main(){
  dotenv.config()
  const bucket = env.BUCKET
  const key = env.KEY

  console.info("creating stream")
  const {stream,upload} = createS3Stream(key,bucket)

  const data1 = randomBytes(5242880)
  console.info('writing data1 to stream')
  await writeToStream(stream,data1)

  const data2 = randomBytes(5242880)
  console.info('writing data2 to stream')
  await writeToStream(stream,data2)

  console.info('closing stream')
  await finishS3Stream(upload,stream)
}

main()

Другие вопросы по теме