Сопрограммы C++20 читают/записывают веб-сокет

Я хочу создать веб-сокет, работающий в одном потоке, используя сопрограммы и boost::asio. Одна сопрограмма будет отвечать за запись (async_write), а другая — за чтение (async_read).

Если какая-либо сопрограмма получит исключение (на данный момент я предполагаю, что каждое исключение означает разрыв соединения), я попробую переподключиться.

Для записи мне нужен writeBuffer, который служит очередью для сообщений. Клиенты веб-сокета будут вызывать ws.Send(data), и вместо немедленной отправки он останется в буфере до следующего ws.Run() вызова.

Чтобы все работало так, как описано, мне нужен способ приостановить сопрограмму write, если writeBuffer пуста. Если я этого не сделаю, он будет вращаться вечно, ожидая заполнения буфера. Но я получаю сообщение об ошибке при попытке приостановить его с помощью std::suspend_always{}:

error C2665: 'boost::asio::detail::awaitable_frame_base<Executor>::await_transform': no overloaded function could convert all the argument types

Думаю, я не приостанавливаю сопрограмму с помощью asio::awaitable. Мне действительно нужен этот прокси-буфер в качестве очереди для моих сообщений. Вероятно, я мог бы использовать что-то еще из boost - возможно, signals предоставить способ co_await и для них, но я боюсь, что мне придется задать еще 3 вопроса, чтобы понять это.

Вот мой код, сокращенный до минимума:

#include <iostream>
#include <coroutine>
#include <optional>

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;

struct CoroWebsocket {

    CoroWebsocket(std::string host, std::string port)
    : _host(std::move(host))
    , _port(std::move(port)) {
        asio::co_spawn(_ws.get_executor(), do_run(), asio::detached);
    }

    void Run() {
        _ioc.run_for(50ms);
    }

    void Write(std::string data) {
        // TODO: mutex
        _writeBuffer.push_back(std::move(data));
    }

    std::optional<std::string> Read(){ 
        // TODO: mutex
        if (_readBuffer.empty())
            return {};
        const auto message = _readBuffer.back();
        _readBuffer.pop_back();
        return message;
    }

private:
    const std::string _host, _port;
    using tcp = asio::ip::tcp;
    std::vector<std::string>       _writeBuffer; // Will be filled externally.
    std::vector<std::string>       _readBuffer;
    boost::asio::io_context        _ioc;
    websocket::stream<tcp::socket> _ws{_ioc};

    asio::awaitable<void> do_run() {
        while(true) {
            try {
                co_await do_connect();
                co_await asio::co_spawn(_ws.get_executor(), do_write() || do_read(), asio::use_awaitable); // If either ends, it must've been an exception. Reconnect.
            } catch (const boost::system::system_error& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
        }
    }

    asio::awaitable<void> do_connect() {
        try {
            while(true) {
                co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), asio::use_awaitable);
                _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                    req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
                }));

                co_await _ws.async_handshake(_host + ':' + _port, "/", asio::use_awaitable);
                _readBuffer.clear();
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }

    asio::awaitable<void> do_write() {
        try {
            while(true) {
                while (_writeBuffer.empty()) {
                    co_await std::suspend_always{}; // I want to switch context but ERROR
                }

                for (const auto& message : _writeBuffer) {
                    co_await _ws.async_write(boost::asio::buffer(message), asio::use_awaitable);
                }
                _writeBuffer.clear();
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }

    asio::awaitable<void> do_read() {
        try {
            while(true) {
                if (0 != co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable)) {
                    while (!_ws.is_message_done()) {
                        co_await _ws.async_read_some(boost::asio::buffer(_readBuffer), asio::use_awaitable);
                    }
                    // Signal new message.
                }
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    
    }
};

В идеале я бы также хотел заменить io_context на thread_pool с 1 потоком, чтобы я действительно мог запускать эти сопрограммы отдельно. Однако, если я просто позволю им вращаться в отдельном потоке, если нет операций чтения или записи, я думаю, что поток будет сжигать циклы при простом переключении контекста, поскольку оба приостанавливаются. Чтобы решить эту проблему, я подумал о добавлении в смесь третьей сопрограммы, которая останавливала бы поток с помощью основного this_thread::sleep(50ms), если при последнем вращении не выполнялось чтение или запись.

Если работы нет, сопрограмма вообще не просыпается. Что тут «отдыхать»? Фактически, выполнение this_thread::sleepXXX приведет к блокировке потока ввода-вывода, и вы больше не сможете отвечать на входящие сообщения. Можете ли вы описать ЧТО вы хотите сделать, а не КАК?

sehe 07.03.2024 13:27

@sehe сама сопрограмма - да. Но я думал, что thread_pool передаст выполнение чему-то другому - в идеале другой сопрограмме. Если бы не было rest, я думал, это будет пинг-понг между приостановленными сопрограммами, которые вечно ничего не делают. Я думал, что смогу co_spawn(_, a || b || c), и если a приостанавливает, b подхватывает его. Если b приостанавливает работу, то c подхватывает ее, а затем возвращается к a. Хорошо, что касается ЧЕГО: я хочу, чтобы веб-сокет работал в одном потоке, обслуживающем чтение и запись по мере их поступления. Если делать нечего - проверьте на работу без сжигания циклов.

Some Dinosaur 07.03.2024 13:36

@sehe Я переписал вопрос, пытаясь уточнить его. надеюсь, теперь это имеет смысл

Some Dinosaur 07.03.2024 19:00

«Если бы не было отдыха, я думал, что это был бы пинг-понг между приостановленными сопрограммами, которые вечно ничего не делают». - это просто не тот случай

sehe 07.03.2024 19:32

Что делает thread_pool, если вся его работа приостановлена, то есть вся запланированная работа возвращается await_suspend() { return true; } (я думаю, это происходит, если нет новых данных для чтения)? Если я запланирую это detached, не будет ли оно просто крутиться вечно, отскакивая от await_ready() звонков?

Some Dinosaur 07.03.2024 19:50

Ничего. Все потоки спят, т.е. ядро ​​их не планирует.

sehe 07.03.2024 20:31
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
6
294
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я хочу, чтобы веб-сокет работал в одном потоке и обслуживал операции чтения и записи по мере их поступления. Если делать нечего - проверьте на работу без сжигания циклов

Вам повезло, именно в этом и заключается асинхронное завершение.

Если нечего делать, ничего не произойдет, и поток(и) в пуле фактически перейдут в спящий режим, а это означает, что другие процессы в системе могут получить ход. Разница в том, что вместо «голого сна» используется «умный сон», например: сон просыпается, как только появляется какое-либо соответствующее событие ввода-вывода. Это может быть любое событие, поддерживаемое службой Asio, например файлы, каналы, UNIX или интернет-сокеты, последовательный порт, сигналы асинхронного процесса¹.

Что касается кода, позвольте мне сначала отметить, что ожидание появления:

    co_await asio::co_spawn(
        _ws.get_executor(), do_write() || do_read(),
        asio::use_awaitable);

это потенциально более дорогостоящий способ просто написать

    co_await (do_write() || do_read()); // idiomatic full-duplex!

И

    co_await std::suspend_always{}; // I want to switch context but ERROR

Это также всегда неоптимально, но если вы настаиваете, это может быть

    co_await post(asio::deferred);

Как подойти

Я бы сделал так, чтобы цикл записи запускался только тогда, когда что-то стоит в очереди. Это сразу упрощает синхронизацию доступа к очереди с помощью цепочки.

В качестве альтернативы посмотрите на channel как замену очереди. Поток записи может асинхронно получать данные из канала. Это также дает вам контроль над емкостью очереди: https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/overview/channels.html

Каналы кажутся здесь лучшей идеей, потому что они также позволяют вам писать интерфейс Read естественно и оптимально.

Демо

Использование каналов кажется подходящим вариантом. Все сводится к такой сути:

const std::string _host, _port;
Stream            _ws;
Channel           _in{_ws.get_executor()}, _out{_ws.get_executor()};

Task do_run() {
    while (true) {
        try {
            co_await do_connect();
            co_await (do_write_loop() || do_read_loop());
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }
}

Task do_write_loop() {
    for (;;)
        co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
}

Task do_read_loop() {
    for (Message msg;; msg.clear()) {
        auto buf = asio::dynamic_buffer(msg);

        auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
        co_await _in.async_send(ec, std::move(msg));
    }
}

Добавление флага корректного завершения работы и задержки отключения в случае сбоя соединения:

Task do_run() {
    for (; !_close_requested; co_await delay(200ms)) {
        try {
            co_await do_connect();
            co_await (do_write_loop() || do_read_loop());
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error: " << se.code().message() << std::endl;
        }
    }
}

Полный список

Прямой эфир на Колиру

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/beast.hpp>

namespace asio      = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
using boost::system::error_code;
using tcp = asio::ip::tcp;

struct Server {
    using Message  = std::string;
    using Task     = asio::awaitable<void>;
    using Channel  = asio::deferred_t::as_default_on_t< //
        asio::experimental::concurrent_channel<void(error_code, Message)>>;
    using Stream   = asio::deferred_t::as_default_on_t<websocket::stream<tcp::socket>>;
    using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
    using Opts     = websocket::stream_base;

    Server(asio::any_io_executor ex, std::string host, std::string port)
        : _host(std::move(host))
        , _port(std::move(port))
        , _ws(ex) {
        co_spawn(ex, do_run(), asio::detached);
    }

    bool Write(std::string data) { return _out.try_send(error_code{}, std::move(data)); }

    std::optional<std::string> Read() {
        Message ret;
        if (_in.try_receive([&](error_code ec, Message msg_) {
                if (ec)
                    throw boost::system::system_error(ec);
                ret = std::move(msg_);
            }))
            return ret;
        return std::nullopt;
    }

    void close() {
        _close_requested = true;
        post(_ws.get_executor(), [this] {
            if (error_code ignore; _ws.is_open())
                _ws.close({}, ignore);
        });
    }

  private:
    const std::string _host, _port;
    Stream            _ws;
    Channel           _in{_ws.get_executor(), 10}, _out{_ws.get_executor(), 10};
    std::atomic_bool  _close_requested{false};

    Task delay(auto duration_or_timepoint) {
        auto ex = co_await asio::this_coro::executor;
        co_await asio::steady_timer(ex, duration_or_timepoint).async_wait(asio::deferred);
    }

    Task do_run() {
        for (; !_close_requested; co_await delay(200ms)) {
            try {
                co_await do_connect();
                co_await (do_write_loop() || do_read_loop());
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
        }
    }

    Task do_write_loop() {
        for (;;)
            co_await _ws.async_write(asio::buffer(co_await _out.async_receive()));
    }

    Task do_read_loop() {
        for (Message msg;; msg.clear()) {
            auto buf = asio::dynamic_buffer(msg);

            auto [ec, bytes] = co_await _ws.async_read(buf, as_tuple(asio::deferred));
            co_await _in.async_send(ec, std::move(msg));

            if (ec)
                break;
        }
    }

    asio::awaitable<void> do_connect() {
        auto ex = co_await asio::this_coro::executor;
        if (error_code ignore; _ws.is_open()) {
            _ws.close({}, ignore);
            _in.reset();
            _out.reset();
        }

        auto eps = co_await Resolver(ex).async_resolve(_host, _port);
        co_await async_connect(beast::get_lowest_layer(_ws), eps);

        _ws.set_option(Opts::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        co_await _ws.async_handshake(_host + ':' + _port, "/");
    }
};

int main() {
    asio::thread_pool ioc;
    Server s(make_strand(ioc), "localhost", "8989");

    using std::this_thread::sleep_for;

    for (auto msg : {"foo", "bar", "qux"}) {
        s.Write(msg);
        sleep_for(1s);

        while (auto response = s.Read())
            std::cout << "Received response " << quoted(*response) << std::endl;
    }

    s.close();
    ioc.join();
}

С живой демонстрацией против:

 websocketd --port 8989 -- \
      bash -c 'tee log | while read line; do echo "Responding to ($line)"; done'

¹ и некоторые особенности платформы, например порты завершения в Windows.

Другие вопросы по теме