Я работаю с ReadableStreams в коде JS/браузера и столкнулся со следующей проблемой. При использовании ReadableStreamBYOBReader
для управления размером чтения потока, если поток закрывается во время запроса на чтение без данных в очереди, чтение никогда не возвращается. Смотрите первый фрагмент ниже.
Последнее чтение возвращается, как и ожидалось, если вместо этого использовать ReadableStreamDefaultReader
для чтения потока. См. второй фрагмент ниже (реализации потока во фрагментах идентичны).
Учитывая, что это происходит одинаково как в Chrome, так и в FF, такое поведение кажется ожидаемым. Но если да, то как закрыть поток, считываемый в режиме BYOB, если вы не знаете, сколько данных останется до выполнения каждого запроса на чтение? Вызов ошибки на контроллере или отмена потока вызывают возврат, но и то и другое уродливо.
Я прочитал некоторые спецификации whatwg , в них прямо не говорится, что должно произойти, когда поток закрыт и в BYOD нет фрагментов close steps (no chunk)
, но там сказано, что поток должен закрываться в режиме по умолчанию.
Если поток становится закрытым, то обещание выполняется с оставшиеся элементы в потоке, которых может быть меньше, чем первоначально запрошенная сумма. Если не дано, то обещание разрешается когда хотя бы один элемент доступен.
Приведенные ниже фрагменты работают в Chrome и FF, но не в Safari, поскольку у Safari еще нет полноценного API-интерфейса. Также не обращайте внимания на то, что byobRequest
не используется и чтение потока не оптимизировано. Моей целью было упростить логику.
// Hangs if you try to read and the stream closes (when readBlocks > streamBlocks)
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);
readAllBYOB(reader, buffer).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.info("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.info("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.info(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.info(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
// In BYOD mode, the read never returns. Unless you do one of the following:
// 1. Enqueueing data before calling close (but we don't have any to enqueue)
// 2. Call controller.error(), but that's ugly
// 3. Call stream.cancel(), which also seems wrong
if (totalBytesOutput >= blockSize * loops) {
console.info("stream pull- closing");
controller.close();
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAllBYOB(reader, output) {
let targetBytes = output.byteLength;
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.info('readAllBYOB- start: ', targetBytes);
while (readBytes < targetBytes) {
console.info('readAllBYOB- try reading:', output.byteLength);
// This does not return on the final read, even when stream is closed
let { done, value } = await reader.read(output);
console.info('readAllBYOB- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
if (readBytes < targetBytes) {
output = new Uint8Array(targetBytes - readBytes);
}
}
console.info(
'readAllBYOB- blocks, remainingBytes, done:',
blocks.length,
targetBytes - readBytes,
streamDone
);
return [blocks, streamDone];
}
// Works as expected
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader();
readAll(reader).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.info("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.info("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.info(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.info(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
// In BYOD mode, the read never returns. Unless you do one of the following:
// 1. Enqueueing data before calling close (but we don't have any to enqueue)
// 2. Call controller.error(), but that's ugly
// 3. Call stream.cancel(), which also seems wrong
if (totalBytesOutput >= blockSize * loops) {
console.info("stream pull- closing");
controller.close();
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAll(reader) {
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.info('readAll- start');
while (true) {
console.info('readAll- try reading');
// This always returns as expected
let { done, value } = await reader.read();
console.info('readAll- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
}
console.info(
'readAll- blocks, done:',
blocks.length,
streamDone
);
return [blocks, streamDone];
}
Кажется, вы должны выполнить запрос BYOB. Для этого вы можете вызвать метод ответить() контроллера .byobRequest, передав 0
как bytesRead.
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);
readAllBYOB(reader, buffer).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.info("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.info("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.info(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.info(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
if (totalBytesOutput >= blockSize * loops) {
console.info("stream pull- closing");
controller.close();
controller.byobRequest?.respond(0);
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.info("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAllBYOB(reader, output) {
let targetBytes = output.byteLength;
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.info('readAllBYOB- start: ', targetBytes);
while (readBytes < targetBytes) {
console.info('readAllBYOB- try reading:', output.byteLength);
// This does not return on the final read, even when stream is closed
let { done, value } = await reader.read(output);
console.info('readAllBYOB- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
if (readBytes < targetBytes) {
output = new Uint8Array(targetBytes - readBytes);
}
}
console.info(
'readAllBYOB- blocks, remainingBytes, done:',
blocks.length,
targetBytes - readBytes,
streamDone
);
return [blocks, streamDone];
}
Спасибо, это действительно работает. Хотя я уверен, что есть причина, это кажется плохим выбором с точки зрения разработки API потоковой передачи. Это означает, что при создании ReadableStream вы не можете игнорировать запросы BYOD, даже если вы не планируете ничего записывать в представление. Или вы должны всегда ставить данные в очередь перед закрытием потока и быть уверенными, что перед закрытием не происходит никаких ожиданий. Неряшливый.