Я новичок в библиотеках Node.js и пытаюсь понять, как использовать асинхронную итерацию в потоке ответов HTTP. Моя общая цель - прочитать большой поток ответов и обработать его по мере поступления фрагментов, в настоящее время с помощью функции генератора. Я не могу сохранить весь ответ в памяти для обработки.
Я использую библиотеку request
для выполнения HTTP-запроса следующим образом.
const request = require("request");
// contrived chunk-by-chunk stream processing
async function* getChunks(stream) {
for await (const chunk of stream) {
yield chunk[0];
}
}
async function doWork() {
var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
for await (c of getChunks(response)) {
console.info(c);
}
}
Когда я запускаю doWork()
, я получаю сообщение об ошибке, указывающее, что переменная stream
в getChunks()
не является асинхронно-итерируемой.
TypeError: поток не является асинхронным итерируемым
Это удивительно, так как я думал, что все читаемые потоки, как правило, асинхронно повторяемы, и что библиотека запросов возвращает поток, когда обратный вызов не предоставляется. Когда я заменяю request.get(...)
на fs.createReadStream(...)
какой-то локальный файл, все работает как положено.
Возможно, библиотека request
не поддерживает это. Если да, то что мне нужно сделать для обработки потоков ответов HTTP с помощью асинхронной итерации?
Использование Node.js 11.13 и request
2.88.0.
Похоже, вам придется использовать другие альтернативы, как они упоминаются в документации модуля request
, которую вы можете найти здесь.
https://www.npmjs.com/package/request
request supports both streaming and callback interfaces natively. If you'd like
request to return a Promise instead, you can use an alternative interface wrapper for
request. These wrappers can be useful if you prefer to work with Promises, or if
you'd like to use async/await in ES2017.
Several alternative interfaces are provided by the request team, including:
request-promise (uses Bluebird Promises)
request-promise-native (uses native Promises)
request-promise-any (uses any-promise Promises)`
мой ответ, основанный на следующем вопросе:
Я думаю, вы можете создать async await
собственный метод, который это сделает.
async function doMyWork() {
try {
const response = await myOwnRequest(url);
} catch (e) {
console.info ('the error', e);
}
}
function myOwnRequest(url) {
return new Promise(function (resolve, reject) {
const resp = request.get(url);
if (resp) {
resolve();
} else {
reject();
}
});
}
@Steve Guildi Пожалуйста, посмотрите, поможет ли вам мой обновленный ответ.
@SteveGuidi был вышеприведенным (обновленным) ответом, помогите? дайте мне знать, если вы хотите, чтобы я обновил что-нибудь
В myOwnRequest есть небольшая ошибка — resolve()
нужно взять возвращаемый объект, а именно resp
. В противном случае обещание ведет себя так, как ожидалось. К сожалению, результат request.get(...)
вообще не является асинхронно-итерируемым, независимо от того, возвращается ли он через Promise или напрямую. Вместо этого я использовал библиотеку axios
, которая возвращает асинхронно-итерируемые потоки (см. мой ответ для примера).
Я еще немного поэкспериментировал с библиотеками request
и request-promise-native
и не думаю, что это возможно в текущей реализации. Результирующий поток вообще не выглядит асинхронно-итерируемым. Кроме того, правильная реализация должна await
возвращать ответ перед обработкой потока (как предложено Ответ @JBone). Но если вы вызываете await request.get(...)
, вы получаете все содержимое ответа, что нежелательно для больших ответов.
const r = require("request");
const rpn = require("request-promise-native");
// contrived chunk-by-chunk stream processing
async function* getChunks(stream) {
for await (const chunk of stream) {
yield chunk[0];
}
}
async function doWork() {
const url = "https://pastebin.com/raw/x4Nn0Tby";
const response = r.get(url); // returns a non-async-iterable object.
const response2 = await rp.get(url); // returns the contents of url
for await (c of getChunks(response)) { // yields response not async-iterable error.
console.info(c);
}
}
Мое решение этой проблемы состояло в том, чтобы заменить использование request
и request-promise-native
библиотекой axios
. Библиотеки функционально похожи, но axios
позволяет указать, что запрос должен разрешаться в поток; как и ожидалось, поток является асинхронно-итерируемым.
const axios = require("axios");
async function doWork() {
var response = await axios.request({
method: "GET",
url: "https://pastebin.com/raw/x4Nn0Tby",
responseType: "stream",
});
for await (c of getChunks(response.data)) { // async-iteration over response works as expected.
console.info(c);
}
}
Какую версию аксиом вы используете? С 0.19.0 я получаю response is not async iterable
. Я не смог найти никаких документов, указывающих на то, что axios поддерживает это. Есть идеи?
@mattpr: я использовал 0.18.0, когда писал пример кода выше. Насколько я помню, вам нужно установить тип ответа на stream
, чтобы это работало правильно.
Простой ответ: нет. Возможно, вы захотите использовать обертку на основе обещаний вокруг request
, например просьба-обещание, которая затем также работает с async
/await
.
Подробности: Обратите внимание, что request
был устарел его создатель и, следовательно, будет прекращен. Это означает, что рано или поздно вам, скорее всего, придется переключиться на другое решение, такое как аксиомы, суперагент или иголка, и это лишь некоторые из них.
Конечно, вы должны оценить эти модули и выяснить, какой из них лучше всего соответствует вашим потребностям, но я лично рекомендую начать с axios
, поскольку в прошлом у меня был очень хороший опыт работы с ним, однако, YMMV.
Параметр потока axios не работал у меня, используя пример кода в приведенном выше ответе на axios 0.19.0. Может быть проблема между стулом и клавиатурой, но в любом случае... вот альтернативный подход с использованием request
.
В итоге я адаптировал потоковую передачу запросов к асинхронному генератору (конечно, с промежуточным буфером). Это позволяет использовать интерфейс «потокового» типа, в котором чтение и запись данных могут чередоваться... это не гарантирует низкого потребления памяти. каналы запросов («пуши») к нашему Writable так быстро, как это возможно, и у нас нет возможности приостановить это или перевести его в интерфейс типа «pull» (насколько я знаю). Поэтому, если мы считываем данные из буфера медленнее, чем данные записываются: буфер станет очень большим, а использование памяти будет высоким.
Поэтому, если очень важно снизить использование памяти, и вы анализируете большие файлы из http-источников... тогда, возможно, выполняете некоторый мониторинг/отчетность по размеру буфера во время "потоковой передачи", чтобы увидеть, быстрее или медленнее вы потребляете код, чем поток так что вы знаете, станет ли буфер огромным или останется маленьким. Конечно, если вы тестируете очень медленный http-сервер... тогда все ставки сняты.
Возможно, это можно решить, установив фиксированный размер буфера и создав блок _write
до тех пор, пока не произойдет дополнительное чтение (освобождение места в буфере)... т.е. поэтому запрос должен ждать, чтобы записать больше данных в канал. Однако запрос может буферизоваться внутри... так что это не поможет с потреблением памяти, если данные все равно накапливаются в конце запросов. Надо бы проверить.
Образец кода:
const request = require('request'),
Writable = require('stream').Writable,
EventEmitter = require('events');
module.exports = function (url, MAX_BYTES=1024) {
var response = new ResponseBuffer(MAX_BYTES);
request
.get(url)
.on('error', function(err) { throw err; })
.pipe(response)
.on('error', function(err) { throw err; });
return response.reader();
};
class ResponseBuffer extends Writable {
constructor (MAX_BYTES=1024) {
super();
this.buffer = '';
this.open = true;
this.done = null; // callback to call when done reading.
this.MAX_BYTES = MAX_BYTES;
this.events = new EventEmitter();
}
_write(chunk, enc, next) {
this.buffer += chunk;
this.events.emit('data');
next();
}
_final(done) {
this.open = false; // signal to reader to return after buffer empty.
return done();
}
async * reader () {
while (true) {
if (this.buffer.length == 0) {
// buffer empty and Writable !open. return.
if (!this.open) { return; }
else { // buffer empty. wait for data.
await new Promise(resolve => this.events.once('data', resolve));
}
}
let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
yield this.buffer.slice(0, read_bytes);
this.buffer = this.buffer.slice(read_bytes);
}
}
}
Затем используйте его так:
const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
chunk;
for await (chunk of httpGen) {
// do something with chunk.
}
Альтернативный подход (если вас особенно беспокоит использование памяти) состоит в том, чтобы просто загрузить на диск (потоковая передача в средство записи файлов), а затем постепенно читать с диска (вы можете выполнить асинхронную итерацию fs.createReadStream(...)
)
Спасибо за предложение. Я не уверен, что мне нужен
Promise
, возвращаемый изrequest.get()
, так как мне действительно просто нужно работать с результирующим потоком (который, как я предполагал, поддерживается ожиданием изначально, какfs.createReadStream()
). Если возвращаетсяPromise
(через request-promise-native), то вызовawait request.get(...)
преобразует весь поток ответов в строку.