Я хочу создать веб-сокет, работающий в одном потоке, используя сопрограммы и boost::asio. Одна сопрограмма будет отвечать за запись (async_write), а другая — за чтение (async_read).
Если какая-либо сопрограмма получит исключение (на данный момент я предполагаю, что каждое исключение означает разрыв соединения), я попробую переподключиться.
Для записи мне нужен writeBuffer, который служит очередью для сообщений. Клиенты веб-сокета будут вызывать ws.Send(data), и вместо немедленной отправки он останется в буфере до следующего ws.Run() вызова.
Чтобы все работало так, как описано, мне нужен способ приостановить сопрограмму write, если writeBuffer пуста. Если я этого не сделаю, он будет вращаться вечно, ожидая заполнения буфера. Но я получаю сообщение об ошибке при попытке приостановить его с помощью std::suspend_always{}:
error C2665: 'boost::asio::detail::awaitable_frame_base<Executor>::await_transform': no overloaded function could convert all the argument types
Думаю, я не приостанавливаю сопрограмму с помощью asio::awaitable.
Мне действительно нужен этот прокси-буфер в качестве очереди для моих сообщений. Вероятно, я мог бы использовать что-то еще из boost - возможно, signals предоставить способ co_await и для них, но я боюсь, что мне придется задать еще 3 вопроса, чтобы понять это.
Вот мой код, сокращенный до минимума:
#include <iostream>
#include <coroutine>
#include <optional>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
struct CoroWebsocket {
CoroWebsocket(std::string host, std::string port)
: _host(std::move(host))
, _port(std::move(port)) {
asio::co_spawn(_ws.get_executor(), do_run(), asio::detached);
}
void Run() {
_ioc.run_for(50ms);
}
void Write(std::string data) {
// TODO: mutex
_writeBuffer.push_back(std::move(data));
}
std::optional<std::string> Read(){
// TODO: mutex
if (_readBuffer.empty())
return {};
const auto message = _readBuffer.back();
_readBuffer.pop_back();
return message;
}
private:
const std::string _host, _port;
using tcp = asio::ip::tcp;
std::vector<std::string> _writeBuffer; // Will be filled externally.
std::vector<std::string> _readBuffer;
boost::asio::io_context _ioc;
websocket::stream<tcp::socket> _ws{_ioc};
asio::awaitable<void> do_run() {
while(true) {
try {
co_await do_connect();
co_await asio::co_spawn(_ws.get_executor(), do_write() || do_read(), asio::use_awaitable); // If either ends, it must've been an exception. Reconnect.
} catch (const boost::system::system_error& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
asio::awaitable<void> do_connect() {
try {
while(true) {
co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), asio::use_awaitable);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/", asio::use_awaitable);
_readBuffer.clear();
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
asio::awaitable<void> do_write() {
try {
while(true) {
while (_writeBuffer.empty()) {
co_await std::suspend_always{}; // I want to switch context but ERROR
}
for (const auto& message : _writeBuffer) {
co_await _ws.async_write(boost::asio::buffer(message), asio::use_awaitable);
}
_writeBuffer.clear();
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
asio::awaitable<void> do_read() {
try {
while(true) {
if (0 != co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable)) {
while (!_ws.is_message_done()) {
co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable);
}
// Signal new message.
}
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
};
В идеале я бы также хотел заменить io_context на thread_pool с 1 потоком, чтобы я действительно мог запускать эти сопрограммы отдельно. Однако, если я просто позволю им вращаться в отдельном потоке, если нет операций чтения или записи, я думаю, что поток будет сжигать циклы при простом переключении контекста, поскольку оба приостанавливаются. Чтобы решить эту проблему, я подумал о добавлении в смесь третьей сопрограммы, которая останавливала бы поток с помощью основного this_thread::sleep(50ms), если при последнем вращении не выполнялось чтение или запись.
@sehe сама сопрограмма - да. Но я думал, что thread_pool передаст выполнение чему-то другому - в идеале другой сопрограмме. Если бы не было rest, я думал, это будет пинг-понг между приостановленными сопрограммами, которые вечно ничего не делают. Я думал, что смогу co_spawn(_, a || b || c), и если a приостанавливает, b подхватывает его. Если b приостанавливает работу, то c подхватывает ее, а затем возвращается к a. Хорошо, что касается ЧЕГО: я хочу, чтобы веб-сокет работал в одном потоке, обслуживающем чтение и запись по мере их поступления. Если делать нечего - проверьте на работу без сжигания циклов.
@sehe Я переписал вопрос, пытаясь уточнить его. надеюсь, теперь это имеет смысл
«Если бы не было отдыха, я думал, что это был бы пинг-понг между приостановленными сопрограммами, которые вечно ничего не делают». - это просто не тот случай
Что делает thread_pool, если вся его работа приостановлена, то есть вся запланированная работа возвращается await_suspend() { return true; } (я думаю, это происходит, если нет новых данных для чтения)? Если я запланирую это detached, не будет ли оно просто крутиться вечно, отскакивая от await_ready() звонков?
Ничего. Все потоки спят, т.е. ядро их не планирует.





Я хочу, чтобы веб-сокет работал в одном потоке и обслуживал операции чтения и записи по мере их поступления. Если делать нечего - проверьте на работу без сжигания циклов
Вам повезло, именно в этом и заключается асинхронное завершение.
Если нечего делать, ничего не произойдет, и поток(и) в пуле фактически перейдут в спящий режим, а это означает, что другие процессы в системе могут получить ход. Разница в том, что вместо «голого сна» используется «умный сон», например: сон просыпается, как только появляется какое-либо соответствующее событие ввода-вывода. Это может быть любое событие, поддерживаемое службой Asio, например файлы, каналы, UNIX или интернет-сокеты, последовательный порт, сигналы асинхронного процесса¹.
Что касается кода, позвольте мне сначала отметить, что ожидание появления:
co_await asio::co_spawn(
_ws.get_executor(), do_write() || do_read(),
asio::use_awaitable);
это потенциально более дорогостоящий способ просто написать
co_await (do_write() || do_read()); // idiomatic full-duplex!
И
co_await std::suspend_always{}; // I want to switch context but ERROR
Это также всегда неоптимально, но если вы настаиваете, это может быть
co_await post(asio::deferred);
Я бы сделал так, чтобы цикл записи запускался только тогда, когда что-то стоит в очереди. Это сразу упрощает синхронизацию доступа к очереди с помощью цепочки.
В качестве альтернативы посмотрите на channel как замену очереди. Поток записи может асинхронно получать данные из канала. Это также дает вам контроль над емкостью очереди: https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/overview/channels.html
Каналы кажутся здесь лучшей идеей, потому что они также позволяют вам писать интерфейс Read естественно и оптимально.
Использование каналов кажется подходящим вариантом. Все сводится к такой сути:
const std::string _host, _port;
Stream _ws;
Channel _in{_ws.get_executor()}, _out{_ws.get_executor()};
Task do_run() {
while (true) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
Task do_write_loop() {
for (;;)
co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}
Task do_read_loop() {
for (Message msg;; msg.clear()) {
auto buf = asio::dynamic_buffer(msg);
auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
co_await _in.async_send(ec, std::move(msg));
}
}
Добавление флага корректного завершения работы и задержки отключения в случае сбоя соединения:
Task do_run() {
for (; !_close_requested; co_await delay(200ms)) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/beast.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
using boost::system::error_code;
using tcp = asio::ip::tcp;
struct Server {
using Message = std::string;
using Task = asio::awaitable<void>;
using Channel = asio::deferred_t::as_default_on_t< //
asio::experimental::concurrent_channel<void(error_code, Message)>>;
using Stream = asio::deferred_t::as_default_on_t<websocket::stream<tcp::socket>>;
using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
using Opts = websocket::stream_base;
Server(asio::any_io_executor ex, std::string host, std::string port)
: _host(std::move(host))
, _port(std::move(port))
, _ws(ex) {
co_spawn(ex, do_run(), asio::detached);
}
bool Write(std::string data) { return _out.try_send(error_code{}, std::move(data)); }
std::optional<std::string> Read() {
Message ret;
if (_in.try_receive([&](error_code ec, Message msg_) {
if (ec)
throw boost::system::system_error(ec);
ret = std::move(msg_);
}))
return ret;
return std::nullopt;
}
void close() {
_close_requested = true;
post(_ws.get_executor(), [this] {
if (error_code ignore; _ws.is_open())
_ws.close({}, ignore);
});
}
private:
const std::string _host, _port;
Stream _ws;
Channel _in{_ws.get_executor(), 10}, _out{_ws.get_executor(), 10};
std::atomic_bool _close_requested{false};
Task delay(auto duration_or_timepoint) {
auto ex = co_await asio::this_coro::executor;
co_await asio::steady_timer(ex, duration_or_timepoint).async_wait(asio::deferred);
}
Task do_run() {
for (; !_close_requested; co_await delay(200ms)) {
try {
co_await do_connect();
co_await (do_write_loop() || do_read_loop());
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
}
Task do_write_loop() {
for (;;)
co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}
Task do_read_loop() {
for (Message msg;; msg.clear()) {
auto buf = asio::dynamic_buffer(msg);
auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
co_await _in.async_send(ec, std::move(msg));
if (ec)
break;
}
}
asio::awaitable<void> do_connect() {
auto ex = co_await asio::this_coro::executor;
if (error_code ignore; _ws.is_open()) {
_ws.close({}, ignore);
_in.reset();
_out.reset();
}
auto eps = co_await Resolver(ex).async_resolve(_host, _port);
co_await async_connect(beast::get_lowest_layer(_ws), eps);
_ws.set_option(Opts::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/");
}
};
int main() {
asio::thread_pool ioc;
Server s(make_strand(ioc), "localhost", "8989");
using std::this_thread::sleep_for;
for (auto msg : {"foo", "bar", "qux"}) {
s.Write(msg);
sleep_for(1s);
while (auto response = s.Read())
std::cout << "Received response " << quoted(*response) << std::endl;
}
s.close();
ioc.join();
}
С живой демонстрацией против:
websocketd --port 8989 -- \
bash -c 'tee log | while read line; do echo "Responding to ($line)"; done'
¹ и некоторые особенности платформы, например порты завершения в Windows.
Если работы нет, сопрограмма вообще не просыпается. Что тут «отдыхать»? Фактически, выполнение
this_thread::sleepXXXприведет к блокировке потока ввода-вывода, и вы больше не сможете отвечать на входящие сообщения. Можете ли вы описать ЧТО вы хотите сделать, а не КАК?