Поддерживает ли библиотека запросов Node.js асинхронно-итерируемый поток ответов?

Я новичок в библиотеках Node.js и пытаюсь понять, как использовать асинхронную итерацию в потоке ответов HTTP. Моя общая цель - прочитать большой поток ответов и обработать его по мере поступления фрагментов, в настоящее время с помощью функции генератора. Я не могу сохранить весь ответ в памяти для обработки.

Я использую библиотеку request для выполнения HTTP-запроса следующим образом.

const request = require("request");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
  for await (c of getChunks(response)) {
    console.info(c);
  }
}

Когда я запускаю doWork(), я получаю сообщение об ошибке, указывающее, что переменная stream в getChunks() не является асинхронно-итерируемой.

TypeError: поток не является асинхронным итерируемым

Это удивительно, так как я думал, что все читаемые потоки, как правило, асинхронно повторяемы, и что библиотека запросов возвращает поток, когда обратный вызов не предоставляется. Когда я заменяю request.get(...) на fs.createReadStream(...) какой-то локальный файл, все работает как положено.

Возможно, библиотека request не поддерживает это. Если да, то что мне нужно сделать для обработки потоков ответов HTTP с помощью асинхронной итерации?

Использование Node.js 11.13 и request 2.88.0.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
1 201
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Похоже, вам придется использовать другие альтернативы, как они упоминаются в документации модуля request, которую вы можете найти здесь. https://www.npmjs.com/package/request

request supports both streaming and callback interfaces natively. If you'd like 
request to return a Promise instead, you can use an alternative interface wrapper for 
request. These wrappers can be useful if you prefer to work with Promises, or if 
you'd like to use async/await in ES2017.

Several alternative interfaces are provided by the request team, including:

request-promise (uses Bluebird Promises)
request-promise-native (uses native Promises)
request-promise-any (uses any-promise Promises)`

мой ответ, основанный на следующем вопросе:

Я думаю, вы можете создать async await собственный метод, который это сделает.

async function doMyWork() {
try {
 const response = await myOwnRequest(url); 
 } catch (e) {
   console.info ('the error', e);
 }  
}

function myOwnRequest(url) {
  return new Promise(function (resolve, reject) {
   const resp = request.get(url);
   if (resp) {
    resolve();
   } else {
     reject();
   }
});
}

Спасибо за предложение. Я не уверен, что мне нужен Promise, возвращаемый из request.get(), так как мне действительно просто нужно работать с результирующим потоком (который, как я предполагал, поддерживается ожиданием изначально, как fs.createReadStream()). Если возвращается Promise (через request-promise-native), то вызов await request.get(...) преобразует весь поток ответов в строку.

Steve Guidi 09.04.2019 16:02

@Steve Guildi Пожалуйста, посмотрите, поможет ли вам мой обновленный ответ.

JBone 09.04.2019 17:10

@SteveGuidi был вышеприведенным (обновленным) ответом, помогите? дайте мне знать, если вы хотите, чтобы я обновил что-нибудь

JBone 09.04.2019 18:05

В myOwnRequest есть небольшая ошибка — resolve() нужно взять возвращаемый объект, а именно resp. В противном случае обещание ведет себя так, как ожидалось. К сожалению, результат request.get(...) вообще не является асинхронно-итерируемым, независимо от того, возвращается ли он через Promise или напрямую. Вместо этого я использовал библиотеку axios, которая возвращает асинхронно-итерируемые потоки (см. мой ответ для примера).

Steve Guidi 10.04.2019 19:33
Ответ принят как подходящий

Я еще немного поэкспериментировал с библиотеками request и request-promise-native и не думаю, что это возможно в текущей реализации. Результирующий поток вообще не выглядит асинхронно-итерируемым. Кроме того, правильная реализация должна await возвращать ответ перед обработкой потока (как предложено Ответ @JBone). Но если вы вызываете await request.get(...), вы получаете все содержимое ответа, что нежелательно для больших ответов.

const r = require("request");
const rpn = require("request-promise-native");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  const url = "https://pastebin.com/raw/x4Nn0Tby";
  const response = r.get(url);         // returns a non-async-iterable object.
  const response2 = await rp.get(url); // returns the contents of url

  for await (c of getChunks(response)) {  // yields response not async-iterable error.
    console.info(c);
  }
}

Мое решение этой проблемы состояло в том, чтобы заменить использование request и request-promise-native библиотекой axios. Библиотеки функционально похожи, но axios позволяет указать, что запрос должен разрешаться в поток; как и ожидалось, поток является асинхронно-итерируемым.

const axios = require("axios");

async function doWork() {
  var response = await axios.request({
    method: "GET",
    url: "https://pastebin.com/raw/x4Nn0Tby",
    responseType: "stream",
  });

  for await (c of getChunks(response.data)) {  // async-iteration over response works as expected.
    console.info(c);
  }
}

Какую версию аксиом вы используете? С 0.19.0 я получаю response is not async iterable. Я не смог найти никаких документов, указывающих на то, что axios поддерживает это. Есть идеи?

mattpr 25.08.2019 17:22

@mattpr: я использовал 0.18.0, когда писал пример кода выше. Насколько я помню, вам нужно установить тип ответа на stream, чтобы это работало правильно.

Steve Guidi 25.08.2019 19:22

Простой ответ: нет. Возможно, вы захотите использовать обертку на основе обещаний вокруг request, например просьба-обещание, которая затем также работает с async/await.

Подробности: Обратите внимание, что request был устарел его создатель и, следовательно, будет прекращен. Это означает, что рано или поздно вам, скорее всего, придется переключиться на другое решение, такое как аксиомы, суперагент или иголка, и это лишь некоторые из них.

Конечно, вы должны оценить эти модули и выяснить, какой из них лучше всего соответствует вашим потребностям, но я лично рекомендую начать с axios, поскольку в прошлом у меня был очень хороший опыт работы с ним, однако, YMMV.

Параметр потока axios не работал у меня, используя пример кода в приведенном выше ответе на axios 0.19.0. Может быть проблема между стулом и клавиатурой, но в любом случае... вот альтернативный подход с использованием request.

В итоге я адаптировал потоковую передачу запросов к асинхронному генератору (конечно, с промежуточным буфером). Это позволяет использовать интерфейс «потокового» типа, в котором чтение и запись данных могут чередоваться... это не гарантирует низкого потребления памяти. каналы запросов («пуши») к нашему Writable так быстро, как это возможно, и у нас нет возможности приостановить это или перевести его в интерфейс типа «pull» (насколько я знаю). Поэтому, если мы считываем данные из буфера медленнее, чем данные записываются: буфер станет очень большим, а использование памяти будет высоким.

Поэтому, если очень важно снизить использование памяти, и вы анализируете большие файлы из http-источников... тогда, возможно, выполняете некоторый мониторинг/отчетность по размеру буфера во время "потоковой передачи", чтобы увидеть, быстрее или медленнее вы потребляете код, чем поток так что вы знаете, станет ли буфер огромным или останется маленьким. Конечно, если вы тестируете очень медленный http-сервер... тогда все ставки сняты.

Возможно, это можно решить, установив фиксированный размер буфера и создав блок _write до тех пор, пока не произойдет дополнительное чтение (освобождение места в буфере)... т.е. поэтому запрос должен ждать, чтобы записать больше данных в канал. Однако запрос может буферизоваться внутри... так что это не поможет с потреблением памяти, если данные все равно накапливаются в конце запросов. Надо бы проверить.

Образец кода:

const request = require('request'),
    Writable = require('stream').Writable,
    EventEmitter = require('events');

module.exports = function (url, MAX_BYTES=1024) {
    var response = new ResponseBuffer(MAX_BYTES);

    request
        .get(url)
        .on('error', function(err) { throw err; })
        .pipe(response)
        .on('error', function(err) { throw err; });

    return response.reader();
};

class ResponseBuffer extends Writable {
    constructor (MAX_BYTES=1024) {
        super();
        this.buffer = '';
        this.open = true;
        this.done = null;  // callback to call when done reading.
        this.MAX_BYTES = MAX_BYTES;
        this.events = new EventEmitter();
    }
    _write(chunk, enc, next) {
        this.buffer += chunk;
        this.events.emit('data');
        next();
    }
    _final(done) {
        this.open = false; // signal to reader to return after buffer empty.
        return done();
    }
    async * reader () {
        while (true) {
            if (this.buffer.length == 0) {
                // buffer empty and Writable !open. return.
                if (!this.open) { return; }
                else { // buffer empty.  wait for data.
                    await new Promise(resolve => this.events.once('data', resolve));
                }
            }
            let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
            yield this.buffer.slice(0, read_bytes);
            this.buffer = this.buffer.slice(read_bytes);
        }
    }
}

Затем используйте его так:


const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
    chunk;
for await (chunk of httpGen) {
    // do something with chunk.
}

Альтернативный подход (если вас особенно беспокоит использование памяти) состоит в том, чтобы просто загрузить на диск (потоковая передача в средство записи файлов), а затем постепенно читать с диска (вы можете выполнить асинхронную итерацию fs.createReadStream(...))

Другие вопросы по теме