У меня есть программа, основанная на boost::asio и звере с поддержкой сопрограмм.
Программа подключается к удаленному серверу через Beast Websocket, через который отправляет запросы, которые инициируются ее внутренней логикой.
Так что в основном запросы довольно частые и тяжелые.
Я использую co_spawn(strand_, sendRequest(data), asio::detached). sendRequest — это сопрограмма, которая отправляет данные на удаленный сервер следующим образом:
ws_->async_write(strand_, asio::buffer(data), asio::use_awaitable)
строка_ в двух местах одна и та же, а ws_ также инициализируется строкой:
class Adapter {
Adapter(asio::strand<boost::asio::io_context::executor_type>&strand)
, strand_(strand)
, ssl_context_(asio::ssl::context::tlsv12_client)
, ws_(new websocket::stream<beast::ssl_stream<beast::tcp_stream>>(strand, ssl_context_)) {}
void handleRequest(std::string& data) {
// do some checks and have a new_data
co_spawn(strand_, sendRequest(new_data), boost::asio::detached);
}
asio::awaitable<void> sendRequest(const std::string& data) {
// do some conversion to have a new_data
co_await ws_->async_write(strand_, asio::buffer(new_data), asio::use_awaitable);
co_return;
}
protected:
asio::strand<boost::asio::io_context::executor_type>&strand_;
asio::ssl::context ssl_context_;
std::unique_ptr<websocket::stream<beast::ssl_stream<beast::tcp_stream>>> ws_;
}
Однако handleReques может сработать очень быстро.
но у меня крашился, если запросы слишком частые. Произошло это в soft_mutex.hpp в звере:
try_lock(T const*)
{
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
if (id_ != 0)
return false;
id_ = T::id;
return true;
}
если бы я заменил co_spawn(strand_, sendRequest(data), asio::detached) пустой сопрограммой вот так:
co_spawn(strand_, []() -> asio::awaitable<void> {co_return;}, asio::detached)
все в порядке. Так что это определенно происходит в ws_->async_write в sendRequest.
Я попробовал с помощью asio::post обернуть co_spawn, все равно не работает.
Я делаю это на Ubuntu 22.04 с Boost 1.85.
(Кстати, если я использую ws_->write, это работает)





Вы инициируете несколько записей. Несмотря на то, что инициации происходят на цепочке, у вас нет никаких мер, гарантирующих, что любые предыдущие операции записи были завершены раньше.
Невозможно показать, как можно исправить свой код, поскольку ничего из этого не показано. В общем, решение будет включать в себя очередь и цикл записи, который отправляет сообщения из очереди до тех пор, пока очередь не станет пустой.
Вы можете увидеть множество моих примеров ASIO, которые содержат std::deque<...>, который я обычно называю outbox_ или что-то подобное.
Код в вопросе неполный. Вот автономный набросок того, как может выглядеть правильное решение. Обратите внимание, что вам необходимо поддерживать время жизни a до тех пор, пока не будут завершены все (отсоединенные) операции с его использованием.
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <deque>
namespace beast = boost::beast;
namespace asio = boost::asio;
namespace websocket = boost::beast::websocket;
namespace ssl = boost::asio::ssl;
struct Adapter {
using Message = std::string;
explicit Adapter(asio::any_io_executor ex) : ws_{ex, ssl_context_} {}
void handleRequest(std::string msg) {
// do some checks and have a message
co_spawn(ws_.get_executor(), sendRequest(std::move(msg)), boost::asio::detached);
}
protected:
asio::awaitable<void> sendRequest(std::string message) {
queue_.push_back(std::move(message));
if (queue_.size() == 1)
co_await writeLoop(); // only one write loop can be active
}
asio::awaitable<void> writeLoop() {
while (!queue_.empty()) {
co_await ws_.async_write(asio::buffer(queue_.front()), asio::deferred);
queue_.pop_front();
}
}
private:
ssl::context ssl_context_{ssl::context::tlsv12_client};
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
std::deque<std::string> queue_;
};
int main() {
asio::thread_pool ioc;
Adapter a{make_strand(ioc)};
ioc.join();
}
Лучше всего публиковать самодостаточный код. В любом случае, я опубликовал то, что имел в виду относительно вашей Adaptor реализации. Обратите внимание, что нам не нужно жестко запрограммировать тип исполнителя.
большое спасибо. @sehe. Я использую что-то вроде этого, чтобы решить проблему с упомянутой вами очередью: ``` asio::awaitable<void> sendRequest(std::string& message) {queue_.push_back(data); если (queue_.size() > 1) { co_return;} OnWrite(); } OnWrite() { const auto& data =queue.front(); ws_->async_write(asio::buffer(data), [this](boost::system::error_code ec, std::size_t bytes_transferred) { if (ec) { // печатаем, но продолжаем } очереди.pop_front() ; если (!queue_.empty()) { OnWrite() } }); } } ``` кажется, работает. Спасибо за вашу помощь!
Приятно слышать. Добро пожаловать в StackOverflow. Пожалуйста, не забудьте проголосовать
спасибо за ваше сообщение, я обновил вопрос кодами.