Моя цель — создать конвейер, итератор которого мог бы содержать десятки тысяч элементов. Поскольку обработка может занять много времени, я хочу возвращать результаты клиенту по мере их получения.
Это не веб-сервер, это связь между двумя системами, которыми я управляю.
Я думаю, мне нужно создать будущее для каждого элемента с замыканием, записывающим в поток. Кажется... большим. Я все еще работаю над этим, и на самом деле Pipeline::tap() может справиться с этой задачей.
Однако моя самая большая проблема — как установить поток. Казалось, что я смогу вернуть Response клиенту с WriteableStream для тела и продолжать писать в поток, пока все элементы не будут обработаны.
Однако даже на сервере Response принимает в качестве тела только ReadableStream. Кажется, чтобы получить WriteableStream, мне нужно использовать сокеты, но при этом я теряю все полезные абстракции Request и Response. Я думаю, что при использовании сокетов я не связан всеми требованиями http, поэтому я мог бы создавать свои собственные абстракции, но я бы предпочел этого не делать.
http-server построен на сокетах, поэтому кажется, что это возможно.
Любой пример потоковой передачи результатов конвейера обратно клиенту будет очень кстати!
Вот код, который у меня есть на данный момент. Поток передается из обработчика запроса в тщетной надежде, что его можно будет вернуть с помощью Response до завершения конвейера. в этой итерации моего кода используется tap(), что, вероятно, не предназначено для его использования:
$pipeline = Pipeline::fromIterable($bigList)
->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
->unordered()
->map(fn (ListItem $item) => $this->getResultForItem($item))
->filter(fn (?Result $r) => $r instanceof Result);
// if there's a stream, don't do anything here that will block until the pipeline is finished.
if (!is_null($stream)) {
$streamStarted = false;
$pipeline->tap(function (Result $r) use ($stream, &$streamStarted) {
$stream->write($streamStarted ? ',' : '[');
$streamStarted = true;
$stream->write(json_encode($r));
});
// something here to send the `end()` message when the pipeline has been completed for all items
//$future = async()
} else {
foreach($pipeline as $profile) {
$indexed[$profile->dsid] = $profile;
}
}
Если бы я мог добавить WriteableStream к ответу, я думаю, я был бы в хорошей форме.






Похоже, что нет способа открыть поток и писать в него, как я представлял выше. Вместо этого можно создать ReadableStream, который будет выполнять всю работу автоматически.
$pipeline = Pipeline::fromIterable($bigList)
->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
->unordered()
->map(fn (ListItem $item) => $this->getResultForItem($item))
->filter(fn (?Result $r) => $r instanceof Result);
->map(fn(Result $r) => json_encode($r));
$stream = new ReadableIterableStream($pipeline->getIterator());
При этом каждый объект Result будет записываться в поток по мере его появления. ReadableStream возвращается в ответе http и обновляется всякий раз, когда конвейер завершается для элемента в $bigList.
Затем клиент делает что-то вроде:
while($text = $response->getBody()->read()) {
$response[] = json_decode($text);
}