Почему чтение не возвращается в режим BYOB, когда поток закрыт

Я работаю с ReadableStreams в коде JS/браузера и столкнулся со следующей проблемой. При использовании ReadableStreamBYOBReader для управления размером чтения потока, если поток закрывается во время запроса на чтение без данных в очереди, чтение никогда не возвращается. Смотрите первый фрагмент ниже.

Последнее чтение возвращается, как и ожидалось, если вместо этого использовать ReadableStreamDefaultReader для чтения потока. См. второй фрагмент ниже (реализации потока во фрагментах идентичны).

Учитывая, что это происходит одинаково как в Chrome, так и в FF, такое поведение кажется ожидаемым. Но если да, то как закрыть поток, считываемый в режиме BYOB, если вы не знаете, сколько данных останется до выполнения каждого запроса на чтение? Вызов ошибки на контроллере или отмена потока вызывают возврат, но и то и другое уродливо.

Я прочитал некоторые спецификации whatwg , в них прямо не говорится, что должно произойти, когда поток закрыт и в BYOD нет фрагментов close steps (no chunk), но там сказано, что поток должен закрываться в режиме по умолчанию.

Если поток становится закрытым, то обещание выполняется с оставшиеся элементы в потоке, которых может быть меньше, чем первоначально запрошенная сумма. Если не дано, то обещание разрешается когда хотя бы один элемент доступен.

Приведенные ниже фрагменты работают в Chrome и FF, но не в Safari, поскольку у Safari еще нет полноценного API-интерфейса. Также не обращайте внимания на то, что byobRequest не используется и чтение потока не оптимизировано. Моей целью было упростить логику.

// Hangs if you try to read and the stream closes (when readBlocks > streamBlocks)
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);

readAllBYOB(reader, buffer).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.info("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.info("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.info(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.info(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.info("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAllBYOB(reader, output) {
  let targetBytes = output.byteLength;
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.info('readAllBYOB- start: ', targetBytes);

  while (readBytes < targetBytes) {
    console.info('readAllBYOB- try reading:', output.byteLength);

    // This does not return on the final read, even when stream is closed
    let { done, value } = await reader.read(output);
    console.info('readAllBYOB- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
    if (readBytes < targetBytes) {
      output = new Uint8Array(targetBytes - readBytes);
    }
  }

  console.info(
    'readAllBYOB- blocks, remainingBytes, done:',
    blocks.length,
    targetBytes - readBytes,
    streamDone
  );

  return [blocks, streamDone];
}

// Works as expected
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader();

readAll(reader).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.info("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.info("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.info(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.info(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.info("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAll(reader) {
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.info('readAll- start');

  while (true) {
    console.info('readAll- try reading');

    // This always returns as expected
    let { done, value } = await reader.read();
    console.info('readAll- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
  }

  console.info(
    'readAll- blocks, done:',
    blocks.length,
    streamDone
  );

  return [blocks, streamDone];
}
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
Улучшение производительности загрузки с помощью Google Tag Manager и атрибута Defer
В настоящее время производительность загрузки веб-сайта имеет решающее значение не только для удобства пользователей, но и для ранжирования в...
Безумие обратных вызовов в javascript [JS]
Безумие обратных вызовов в javascript [JS]
Здравствуйте! Юный падаван 🚀. Присоединяйся ко мне, чтобы разобраться в одной из самых запутанных концепций, когда вы начинаете изучать мир...
Система управления парковками с использованием HTML, CSS и JavaScript
Система управления парковками с использованием HTML, CSS и JavaScript
Веб-сайт по управлению парковками был создан с использованием HTML, CSS и JavaScript. Это простой сайт, ничего вычурного. Основная цель -...
JavaScript Вопросы с множественным выбором и ответы
JavaScript Вопросы с множественным выбором и ответы
Если вы ищете платформу, которая предоставляет вам бесплатный тест JavaScript MCQ (Multiple Choice Questions With Answers) для оценки ваших знаний,...
3
0
62
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Кажется, вы должны выполнить запрос BYOB. Для этого вы можете вызвать метод ответить() контроллера .byobRequest, передав 0 как bytesRead.

const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);

readAllBYOB(reader, buffer).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.info("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.info("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.info(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.info(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        if (totalBytesOutput >= blockSize * loops) {
          console.info("stream pull- closing");
          controller.close();
          controller.byobRequest?.respond(0);
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAllBYOB(reader, output) {
  let targetBytes = output.byteLength;
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.info('readAllBYOB- start: ', targetBytes);

  while (readBytes < targetBytes) {
    console.info('readAllBYOB- try reading:', output.byteLength);

    // This does not return on the final read, even when stream is closed
    let { done, value } = await reader.read(output);
    console.info('readAllBYOB- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
    if (readBytes < targetBytes) {
      output = new Uint8Array(targetBytes - readBytes);
    }
  }

  console.info(
    'readAllBYOB- blocks, remainingBytes, done:',
    blocks.length,
    targetBytes - readBytes,
    streamDone
  );

  return [blocks, streamDone];
}

Спасибо, это действительно работает. Хотя я уверен, что есть причина, это кажется плохим выбором с точки зрения разработки API потоковой передачи. Это означает, что при создании ReadableStream вы не можете игнорировать запросы BYOD, даже если вы не планируете ничего записывать в представление. Или вы должны всегда ставить данные в очередь перед закрытием потока и быть уверенными, что перед закрытием не происходит никаких ожиданий. Неряшливый.

schickb 30.07.2024 17:41

Другие вопросы по теме