Предположим, что целью является тестирование синхронного tcp-клиента с использованием асинхронного tcp-сервера на локальном хосте.
теперь рассмотрим, как клиент и сервер взаимодействуют в следующем тестовом примере:
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
private:
void do_read()
{
std::cout << "Server reads.\n";
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
});
}
void do_write(std::size_t length)
{
std::cout << "Server writes: ";
for (auto i : data_) std::cout << i;
std::cout << "\n";
auto self(shared_from_this());
boost::asio::async_write(
socket_,
boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class EchoSever
{
public:
EchoSever(boost::asio::io_context& io_context, boost::asio::ip::address ipAddress, short port)
: acceptor_(io_context, tcp::endpoint(ipAddress, port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::cout << "client connected.\n";
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main()
{
const auto ipAddress{boost::asio::ip::make_address("127.0.0.1")};
const short port{8080};
boost::asio::io_context io_context;
EchoSever echoServer(io_context, ipAddress, port);
auto waiter = std::async(std::launch::async, [&]()
{
io_context.run();
std::cout << "echoServer has finished\n";
});
// client code:
boost::asio::ip::tcp::socket socket(io_context);
boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));
const std::string requestMessage{"test123"};
boost::asio::write(socket, boost::asio::buffer(requestMessage, requestMessage.size()));
std::cout << "client wrote request: " << requestMessage << "\n";
char reply[1024];
size_t reply_length = boost::asio::read(socket, boost::asio::buffer(reply, requestMessage.size()));
std::cout << "reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
io_context.stop();
waiter.wait();
}
Желаемое поведение будет выглядеть следующим образом:
Выполнение кода как на Godbolt, так и на моем ящике дало разные результаты при нескольких запусках, поэтому должна быть как минимум проблема синхронизации, которую я пытаюсь понять.
Насколько я понимаю, вызов io_context.run()
в отдельном потоке позволяет обработчикам завершения, связанным с экземпляром io_context
, работать в этом потоке. Пока эти обработчики завершения работают с отдельными объектами, синхронизация не требуется. Насколько я понимаю, в приведенном выше примере они работают с отдельными объектами сокетов, так что это нормально.
Когда клиент вызывает boost::asio::write
, становится ясно, что у сервера уже есть сокет, связанный с данной конечной точкой, иначе клиентский вызов boost::asio::connect
раньше бы не удался. После этого серверный вызов start()
, кажется, конкурирует с клиентом, вызывающим boost::asio::read
, т. е. похоже, что может случиться так, что io_context.stop();
будет достигнуто еще до того, как do_read
будет вызван. Это немного удивительно, так как я ожидал, что клиенты boost::asio::read
будут блокироваться до тех пор, пока не поступят данные. Насколько я понимаю, это произойдет даже в том случае, если обработчики завершения будут работать в одной нити.
Что не так на моей картинке?
Как должен выглядеть клиент, чтобы добиться желаемого поведения?
@JakobStark, я написал, что «дал разные результаты при нескольких прогонах», что действительно вводит в заблуждение и неточно. Я видел это на своем компьютере, когда клиент иногда писал в сокет и читал обратно сразу после этого, даже не запуская сервер. Я не наблюдал такого поведения при запуске кода на godbolt. Понятно, что std::cout не является потокобезопасным и вывод может состоять из чередующихся символов. Пример, к счастью, имеет гораздо больше проблем.
std::async
не так явно говорит о многопоточности, как мне бы хотелось. Я бы предпочел просто std::thread
.
Тем не менее, есть еще одна проблема, которая может вас сбить с толку: asio::read
не только блокирует, но если, как вы сделали, вы не ограничите буфер до точного количества байтов, которые должны быть получены, вызов будет блокироваться «на неопределенный срок», пока либо
B никогда не произойдет в вашем тесте. Здесь вы либо используете read_some
, либо сервер заранее закрывает сеансы.
И последнее, но не менее важное: есть ли гонки, в которых сервер может не слушать в то время, когда клиент connect
s? Я не думаю, что это так, потому что конструктор EchoServer
открывает акцептор и инициирует асинхронное принятие. Даже если служба еще не запущена, это означает, что ядро должно ставить в очередь входящие IP-пакеты для этого сокета.
В этом листинге было много улучшений (например, печать только полученных байтов вместо всего буфера, их цитирование, трассировка с отметками времени, идентификаторами потоков и эксклюзивным доступом к консоли, отчеты об ошибках и обработка (например, EOF), использование loopback()
вместо. .. синтаксический анализ строки «127.0.0.1», создание порта без знака, явное использование потока над std::async
, отсутствие ручного переопределения размеров буфера в asio::buffer()
вызовах, отмена слушателя вместо жесткой остановки контекста ввода-вывода, возможно, еще что-то, о чем я забыл) .
Жить на Колиру
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
namespace {
static auto timestamp() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - start) / 1.ms;
}
static std::atomic_int tid_gen = 0;
thread_local int tid = tid_gen++;
std::mutex console_mx;
void trace(auto const&... args) {
std::lock_guard lk(console_mx);
((std::cout << "T:" << tid << std::right << std::setw(10) << timestamp() << "ms ") << ... << args)
<< std::endl;
}
} // namespace
class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
private:
void do_read() {
trace("Server reads.");
auto self = shared_from_this();
socket_.async_read_some(asio::buffer(data_), [this, self](error_code ec, size_t length) {
trace("Session read: ", ec.message(), " ", length);
if (!ec || (length && ec == asio::error::eof))
do_write(length);
});
}
void do_write(size_t length) {
trace("Server writes: ", quoted(std::string_view(data_.data(), length)));
auto self = shared_from_this();
async_write(socket_, asio::buffer(data_, length), [this, self](error_code ec, size_t length) {
trace("Session write: ", ec.message(), " ", length);
if (!ec)
do_read();
});
}
tcp::socket socket_;
std::array<char, 1024> data_;
};
class EchoServer {
public:
EchoServer(asio::any_io_executor ex, asio::ip::address ipAddress, short port)
: acceptor_(ex, tcp::endpoint(ipAddress, port)) {
do_accept();
}
void stop() {
post(acceptor_.get_executor(), [this] {
acceptor_.cancel(); /* or close() */
});
}
private:
void do_accept() {
acceptor_.async_accept( //
make_strand(acceptor_.get_executor()), //
[this](error_code ec, tcp::socket socket) {
if (ec)
trace("Accept: ", ec.message());
if (!ec) {
trace("Accepted ", socket.remote_endpoint());
std::make_shared<Session>(std::move(socket))->start();
do_accept();
}
});
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << std::fixed << std::setprecision(5);
trace("Main start");
auto const ipAddress = asio::ip::address_v4::loopback();
uint16_t const port = 8080;
asio::io_context io_context;
EchoServer echoServer(io_context.get_executor(), ipAddress, port);
auto io_thread = std::thread([&]() {
io_context.run();
trace("Service has finished");
});
{
// client code:
tcp::socket socket(io_context);
tcp::resolver resolver(io_context);
connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));
{
std::string const request{"test123"};
write(socket, asio::buffer(request));
trace("Main client wrote request: ", quoted(request));
}
{
std::array<char, 1024> reply;
size_t n = socket.read_some(asio::buffer(reply));
trace("Main reply is: ", quoted(std::string_view(reply.data(), n)));
}
} // destructor closes connection
echoServer.stop();
trace("Main service stopped, waiting");
io_thread.join();
trace("Main bye");
}
Отпечатки, например.
T:0 0.00171ms Main start
T:0 0.36595ms Main client wrote request: "test123"
T:1 0.42401ms Accepted 127.0.0.1:48852
T:1 0.47220ms Server reads.
T:1 0.53624ms Session read: Success 7
T:1 0.55286ms Server writes: "test123"
T:1 0.60766ms Session write: Success 7
T:1 0.63068ms Server reads.
T:0 0.65522ms Main reply is: "test123"
T:0 0.70585ms Main service stopped, waiting
T:1 0.73496ms Session read: End of file 0
T:1 0.80807ms Accept: Operation canceled
T:1 0.82992ms Service has finished
T:0 0.87840ms Main bye
Или много работает локально:
Дайте мне знать, можете ли вы воспроизвести поведение, которое не описали полностью.
Информация, которую read
будет блокировать на неопределенный срок, была для меня критически важной недостающей частью. Решил не изменять два примера, из которых я получил код, но это не должно служить оправданием для других проблем, которые вы указали. То, как я сформулировал свой вопрос, в любом случае вводит в заблуждение, как я писал выше, извините за это и спасибо за ответ, такой высокий уровень детализации!
Рядом с boost::asio::ip::tcp::socket::read_some
есть boost::asio::read_until
, который ожидает последовательность динамического буфера boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/… — учитывая протокол с разделителем, это выглядит очень гибким для меня, не так ли? быть идиоматическим улучшением кода, который вы разместили на стороне клиента?
Это определенно будет хорошо работать для протокола с разделителями IFF, на который другая сторона отвечает правильно. Конечно, он все еще может быть заблокирован на неопределенный срок, если другая сторона не сможет отправить ожидаемый разделитель. Это то, о чем вы просите, так что, наверное, хорошо. В реальных приложениях я всегда использовал тайм-ауты
Не могли бы вы включить разные результаты, которые выводятся на godbolt и на вашей машине?