Есть ли способ сделать StreamExt::next неблокирующим (сбоить быстро), если поток пуст (необходимо дождаться следующего элемента)?

В настоящее время я делаю что-то вроде этого

use tokio::time::timeout;

while let Ok(option_element) = timeout(Duration::from_nanos(1), stream.next()).await {
...
}

Для слива элементов, уже находящихся в буфере rx потока. Я не хочу ждать следующего элемента, который не был получен.

Я думаю, что тайм-аут замедлит цикл while.

Мне интересно, есть ли лучший способ сделать это без использования тайм-аута? Возможно вот так https://github.com/async-rs/async-std/issues/579 но для потоков в futures/tokio.

Выполняете ли вы другую работу в цикле while? Я как бы предполагаю, что вы не хотите ждать стрима. Если это так, кажется более разумным просто сделать два фьючерса с блоками async (один для цикла и один для другой работы) и просто соединить их, вместо того, чтобы иметь более сложный «неблокирующий» цикл.

Frxstrem 21.12.2020 12:08

Я манипулирую TCP-сокетом пользовательского пространства (не асинхронный/ожидающий стиль) в цикле while. Я делаю это, потому что мне нужно своего рода обратное давление, чтобы данные из восходящего потока не взорвали tx-буфер TCP-сокета.

haolun 21.12.2020 12:12

Тогда это больше похоже на настоящую проблему, я думаю. На самом деле вам не разрешено иметь правильный (не асинхронный) блокирующий код внутри асинхронного кода, вместо этого вы должны использовать асинхронные потоки TCP (например, из tokio или async-std) при работе с асинхронным кодом.

Frxstrem 21.12.2020 12:15

Сокет TCP не блокирует петлю. Он просто отправляет данные в канал. Восходящий поток — TcpStream от tokio. Сокет TCP обращен к файлу туннеля (VPN). Я прерываю цикл while, когда буфер tx сокета TCP заполнен.

haolun 21.12.2020 12:19

Цель этого цикла while — максимально заполнить tx-буфер TCP-сокета и не блокировать внешний основной цикл, который считывает ip-пакеты из файла туннеля. Если другая сторона туннеля принимает TCP-пакеты медленно, она может оказать давление на восходящий поток, чтобы заставить его производить данные медленнее.

haolun 21.12.2020 12:29
Почему Python в конце концов умрет.
Почему Python в конце концов умрет.
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
1
5
1 062
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Прямой ответ на ваш вопрос — использовать метод FutureExt::now_or_never из ящика фьючерсов, как в stream.next().now_or_never().

Однако важно избегать написания цикла занятости, который ожидает несколько вещей, вызывая now_or_never для каждой вещи в цикле. Это плохо, потому что блокирует поток, и вы должны предпочесть другое решение, такое как tokio::select! ждать нескольких вещей. Для особого случая, когда вы постоянно проверяете, должна ли задача завершиться, см. вместо этого этот другой вопрос.

С другой стороны, пример, когда использование now_or_never совершенно нормально, — это когда вы хотите очистить очередь для доступных сейчас элементов, чтобы вы могли каким-то образом обрабатывать их в пакетном режиме. Это нормально, потому что цикл now_or_never перестанет вращаться, как только он опустошит очередь.

Помните, что если поток пуст, то now_or_never завершится успешно, потому что next() в этом случае немедленно возвращает None.

Спасибо! Это решило мою проблему. Я обнаружил, что futures::select_biased! также может решить эту проблему, но FutureExt::now_or_never для этого подходит гораздо лучше.

haolun 21.12.2020 13:45

Другие вопросы по теме