Потоковая передача результатов в amphp

Моя цель — создать конвейер, итератор которого мог бы содержать десятки тысяч элементов. Поскольку обработка может занять много времени, я хочу возвращать результаты клиенту по мере их получения.

Это не веб-сервер, это связь между двумя системами, которыми я управляю.

Я думаю, мне нужно создать будущее для каждого элемента с замыканием, записывающим в поток. Кажется... большим. Я все еще работаю над этим, и на самом деле Pipeline::tap() может справиться с этой задачей.

Однако моя самая большая проблема — как установить поток. Казалось, что я смогу вернуть Response клиенту с WriteableStream для тела и продолжать писать в поток, пока все элементы не будут обработаны.

Однако даже на сервере Response принимает в качестве тела только ReadableStream. Кажется, чтобы получить WriteableStream, мне нужно использовать сокеты, но при этом я теряю все полезные абстракции Request и Response. Я думаю, что при использовании сокетов я не связан всеми требованиями http, поэтому я мог бы создавать свои собственные абстракции, но я бы предпочел этого не делать.

http-server построен на сокетах, поэтому кажется, что это возможно.

Любой пример потоковой передачи результатов конвейера обратно клиенту будет очень кстати!

Вот код, который у меня есть на данный момент. Поток передается из обработчика запроса в тщетной надежде, что его можно будет вернуть с помощью Response до завершения конвейера. в этой итерации моего кода используется tap(), что, вероятно, не предназначено для его использования:

    $pipeline = Pipeline::fromIterable($bigList)
        ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
        ->unordered()
        ->map(fn (ListItem $item) => $this->getResultForItem($item))
        ->filter(fn (?Result $r) => $r instanceof Result);

    // if there's a stream, don't do anything here that will block until the pipeline is finished.
    if (!is_null($stream)) {
        $streamStarted = false;

        $pipeline->tap(function (Result $r) use ($stream, &$streamStarted) {

            $stream->write($streamStarted ? ',' : '[');
            $streamStarted = true;
            $stream->write(json_encode($r));

        });

        // something here to send the `end()` message when the pipeline has been completed for all items
        //$future = async()

    } else {
        foreach($pipeline as $profile) {
            $indexed[$profile->dsid] = $profile;
        }
    }

Если бы я мог добавить WriteableStream к ответу, я думаю, я был бы в хорошей форме.

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Symfony Station Communiqué - 7 июля 2023 г
Symfony Station Communiqué - 7 июля 2023 г
Это коммюнике первоначально появилось на Symfony Station .
Оживление вашего приложения Laravel: Понимание режима обслуживания
Оживление вашего приложения Laravel: Понимание режима обслуживания
Здравствуйте, разработчики! В сегодняшней статье мы рассмотрим важный аспект управления приложениями, который часто упускается из виду в суете...
Установка и настройка Nginx и PHP на Ubuntu-сервере
Установка и настройка Nginx и PHP на Ubuntu-сервере
В этот раз я сделаю руководство по установке и настройке nginx и php на Ubuntu OS.
Коллекции в Laravel более простым способом
Коллекции в Laravel более простым способом
Привет, читатели, сегодня мы узнаем о коллекциях. В Laravel коллекции - это способ манипулировать массивами и играть с массивами данных. Благодаря...
Как установить PHP на Mac
Как установить PHP на Mac
PHP - это популярный язык программирования, который используется для разработки веб-приложений. Если вы используете Mac и хотите разрабатывать...
0
0
68
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Похоже, что нет способа открыть поток и писать в него, как я представлял выше. Вместо этого можно создать ReadableStream, который будет выполнять всю работу автоматически.

$pipeline = Pipeline::fromIterable($bigList)
    ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
    ->unordered()
    ->map(fn (ListItem $item) => $this->getResultForItem($item))
    ->filter(fn (?Result $r) => $r instanceof Result);
    ->map(fn(Result $r) => json_encode($r));

$stream = new ReadableIterableStream($pipeline->getIterator());

При этом каждый объект Result будет записываться в поток по мере его появления. ReadableStream возвращается в ответе http и обновляется всякий раз, когда конвейер завершается для элемента в $bigList.

Затем клиент делает что-то вроде:

while($text = $response->getBody()->read()) {
    $response[] = json_decode($text);
}

Другие вопросы по теме