Ускорить синхронизацию Asio между TCP-клиентом и сервером

Предположим, что целью является тестирование синхронного tcp-клиента с использованием асинхронного tcp-сервера на локальном хосте.

теперь рассмотрим, как клиент и сервер взаимодействуют в следующем тестовом примере:

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}
    void start() { do_read(); }

private:
    void do_read()
    {
        std::cout << "Server reads.\n";
        auto self(shared_from_this());
        socket_.async_read_some(
            boost::asio::buffer(data_, max_length),
            [this, self](boost::system::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    do_write(length);
                }
            });
    }

    void do_write(std::size_t length)
    {
        std::cout << "Server writes: ";
        for (auto i : data_) std::cout << i;
        std::cout << "\n";
        auto self(shared_from_this());
        boost::asio::async_write(
            socket_,
            boost::asio::buffer(data_, length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                    do_read();
                }
            });
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class EchoSever
{
public:
  EchoSever(boost::asio::io_context& io_context, boost::asio::ip::address ipAddress, short port)
  : acceptor_(io_context, tcp::endpoint(ipAddress, port))
  {
      do_accept();
  }

private:
    void do_accept()
    {
        acceptor_.async_accept(
        [this](boost::system::error_code ec, tcp::socket socket)
        {
            if (!ec)
            {
                std::cout << "client connected.\n";
                std::make_shared<session>(std::move(socket))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

int main()
{
    const auto ipAddress{boost::asio::ip::make_address("127.0.0.1")};
    const short port{8080};

    boost::asio::io_context io_context;
    EchoSever echoServer(io_context, ipAddress, port);

    auto waiter = std::async(std::launch::async, [&]()
    {
        io_context.run();
        std::cout << "echoServer has finished\n";
    });

    // client code:
    boost::asio::ip::tcp::socket socket(io_context);
    boost::asio::ip::tcp::resolver resolver(io_context);
    boost::asio::connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));

    const std::string requestMessage{"test123"};
    boost::asio::write(socket, boost::asio::buffer(requestMessage, requestMessage.size()));
    std::cout << "client wrote request: " << requestMessage << "\n";

    char reply[1024];
    size_t reply_length = boost::asio::read(socket, boost::asio::buffer(reply, requestMessage.size()));
    std::cout << "reply is: ";
    std::cout.write(reply, reply_length);
    std::cout << "\n";

    io_context.stop();
    waiter.wait();
}

Желаемое поведение будет выглядеть следующим образом:

  1. Экземпляр сервера асинхронно ожидает подключения клиентов.
  2. Клиент подключается и записывает requestMessage, в то время как сервер асинхронно читает и записывает обратно.
  3. Клиент блокируется до тех пор, пока не придет ответ.

Выполнение кода как на Godbolt, так и на моем ящике дало разные результаты при нескольких запусках, поэтому должна быть как минимум проблема синхронизации, которую я пытаюсь понять.

Насколько я понимаю, вызов io_context.run() в отдельном потоке позволяет обработчикам завершения, связанным с экземпляром io_context, работать в этом потоке. Пока эти обработчики завершения работают с отдельными объектами, синхронизация не требуется. Насколько я понимаю, в приведенном выше примере они работают с отдельными объектами сокетов, так что это нормально.

Когда клиент вызывает boost::asio::write, становится ясно, что у сервера уже есть сокет, связанный с данной конечной точкой, иначе клиентский вызов boost::asio::connect раньше бы не удался. После этого серверный вызов start(), кажется, конкурирует с клиентом, вызывающим boost::asio::read, т. е. похоже, что может случиться так, что io_context.stop(); будет достигнуто еще до того, как do_read будет вызван. Это немного удивительно, так как я ожидал, что клиенты boost::asio::read будут блокироваться до тех пор, пока не поступят данные. Насколько я понимаю, это произойдет даже в том случае, если обработчики завершения будут работать в одной нити.

Что не так на моей картинке?

Как должен выглядеть клиент, чтобы добиться желаемого поведения?

Не могли бы вы включить разные результаты, которые выводятся на godbolt и на вашей машине?

Jakob Stark 14.04.2023 14:48

@JakobStark, я написал, что «дал разные результаты при нескольких прогонах», что действительно вводит в заблуждение и неточно. Я видел это на своем компьютере, когда клиент иногда писал в сокет и читал обратно сразу после этого, даже не запуская сервер. Я не наблюдал такого поведения при запуске кода на godbolt. Понятно, что std::cout не является потокобезопасным и вывод может состоять из чередующихся символов. Пример, к счастью, имеет гораздо больше проблем.

Daniel F 15.04.2023 23:24
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
2
115
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

std::async не так явно говорит о многопоточности, как мне бы хотелось. Я бы предпочел просто std::thread.

Тем не менее, есть еще одна проблема, которая может вас сбить с толку: asio::read не только блокирует, но если, как вы сделали, вы не ограничите буфер до точного количества байтов, которые должны быть получены, вызов будет блокироваться «на неопределенный срок», пока либо

  • (А) буфер заполнен
  • (B) произошла ошибка (например, сервер прерывает соединение)

B никогда не произойдет в вашем тесте. Здесь вы либо используете read_some, либо сервер заранее закрывает сеансы.

И последнее, но не менее важное: есть ли гонки, в которых сервер может не слушать в то время, когда клиент connects? Я не думаю, что это так, потому что конструктор EchoServer открывает акцептор и инициирует асинхронное принятие. Даже если служба еще не запущена, это означает, что ядро ​​должно ставить в очередь входящие IP-пакеты для этого сокета.

Проверенное объявление

В этом листинге было много улучшений (например, печать только полученных байтов вместо всего буфера, их цитирование, трассировка с отметками времени, идентификаторами потоков и эксклюзивным доступом к консоли, отчеты об ошибках и обработка (например, EOF), использование loopback() вместо. .. синтаксический анализ строки «127.0.0.1», создание порта без знака, явное использование потока над std::async, отсутствие ручного переопределения размеров буфера в asio::buffer() вызовах, отмена слушателя вместо жесткой остановки контекста ввода-вывода, возможно, еще что-то, о чем я забыл) .

Жить на Колиру

#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>

namespace asio = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;

namespace {
    static auto timestamp() {
        auto now = std::chrono::high_resolution_clock::now;
        using namespace std::chrono_literals;
        static auto start = now();
        auto        t     = now();

        return (t - start) / 1.ms;
    }

    static std::atomic_int tid_gen = 0;
    thread_local int       tid     = tid_gen++;
    std::mutex             console_mx;

    void trace(auto const&... args) {
        std::lock_guard lk(console_mx);
        ((std::cout << "T:" << tid << std::right << std::setw(10) << timestamp() << "ms ") << ... << args)
            << std::endl;
    }
} // namespace

class Session : public std::enable_shared_from_this<Session> {
  public:
    Session(tcp::socket socket) : socket_(std::move(socket)) {}
    void start() { do_read(); }

  private:
    void do_read() {
        trace("Server reads.");
        auto self = shared_from_this();
        socket_.async_read_some(asio::buffer(data_), [this, self](error_code ec, size_t length) {
            trace("Session read: ", ec.message(), " ", length);
            if (!ec || (length && ec == asio::error::eof))
                do_write(length);
        });
    }

    void do_write(size_t length) {
        trace("Server writes: ", quoted(std::string_view(data_.data(), length)));
        auto self = shared_from_this();
        async_write(socket_, asio::buffer(data_, length), [this, self](error_code ec, size_t length) {
            trace("Session write: ", ec.message(), " ", length);
            if (!ec)
                do_read();
        });
    }

    tcp::socket socket_;
    std::array<char, 1024> data_;
};

class EchoServer {
  public:
    EchoServer(asio::any_io_executor ex, asio::ip::address ipAddress, short port)
        : acceptor_(ex, tcp::endpoint(ipAddress, port)) {
        do_accept();
    }

    void stop() {
        post(acceptor_.get_executor(), [this] {
            acceptor_.cancel(); /* or close() */
        });
    }

  private:
    void do_accept() {
        acceptor_.async_accept(                    //
            make_strand(acceptor_.get_executor()), //
            [this](error_code ec, tcp::socket socket) {
                if (ec)
                    trace("Accept: ", ec.message());
                if (!ec) {
                    trace("Accepted ", socket.remote_endpoint());
                    std::make_shared<Session>(std::move(socket))->start();

                    do_accept();
                }
            });
    }

    tcp::acceptor acceptor_;
};

int main() {
    std::cout << std::fixed << std::setprecision(5);

    trace("Main start");
    auto const     ipAddress = asio::ip::address_v4::loopback();
    uint16_t const port      = 8080;

    asio::io_context io_context;
    EchoServer       echoServer(io_context.get_executor(), ipAddress, port);

    auto io_thread = std::thread([&]() {
        io_context.run();
        trace("Service has finished");
    });

    {
        // client code:
        tcp::socket socket(io_context);

        tcp::resolver resolver(io_context);
        connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));

        {
            std::string const request{"test123"};
            write(socket, asio::buffer(request));
            trace("Main client wrote request: ", quoted(request));
        }

        {
            std::array<char, 1024> reply;
            size_t n = socket.read_some(asio::buffer(reply));
            trace("Main reply is: ", quoted(std::string_view(reply.data(), n)));
        }
    } // destructor closes connection

    echoServer.stop();
    trace("Main service stopped, waiting");
    io_thread.join();
    trace("Main bye");
}

Отпечатки, например.

T:0   0.00171ms Main start
T:0   0.36595ms Main client wrote request: "test123"
T:1   0.42401ms Accepted 127.0.0.1:48852
T:1   0.47220ms Server reads.
T:1   0.53624ms Session read: Success 7
T:1   0.55286ms Server writes: "test123"
T:1   0.60766ms Session write: Success 7
T:1   0.63068ms Server reads.
T:0   0.65522ms Main reply is: "test123"
T:0   0.70585ms Main service stopped, waiting
T:1   0.73496ms Session read: End of file 0
T:1   0.80807ms Accept: Operation canceled
T:1   0.82992ms Service has finished
T:0   0.87840ms Main bye

Или много работает локально:

Дайте мне знать, можете ли вы воспроизвести поведение, которое не описали полностью.

Информация, которую read будет блокировать на неопределенный срок, была для меня критически важной недостающей частью. Решил не изменять два примера, из которых я получил код, но это не должно служить оправданием для других проблем, которые вы указали. То, как я сформулировал свой вопрос, в любом случае вводит в заблуждение, как я писал выше, извините за это и спасибо за ответ, такой высокий уровень детализации!

Daniel F 15.04.2023 23:32

Рядом с boost::asio::ip::tcp::socket::read_some есть boost::asio::read_until, который ожидает последовательность динамического буфера boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/… — учитывая протокол с разделителем, это выглядит очень гибким для меня, не так ли? быть идиоматическим улучшением кода, который вы разместили на стороне клиента?

Daniel F 15.04.2023 23:38

Это определенно будет хорошо работать для протокола с разделителями IFF, на который другая сторона отвечает правильно. Конечно, он все еще может быть заблокирован на неопределенный срок, если другая сторона не сможет отправить ожидаемый разделитель. Это то, о чем вы просите, так что, наверное, хорошо. В реальных приложениях я всегда использовал тайм-ауты

sehe 15.04.2023 23:47

Другие вопросы по теме