Пример увеличения сервера asio datetime и использование std::async не работает

Я пытаюсь изучить boost asio (boost 1.84, C++20, Ubuntu 23.04) и немного адаптировал следующий пример дневного сервера: https://www.boost.org/doc/libs/1_84_0/ doc/html/boost_asio/tutorial/tutdaytime3/src.html

Я оставляю make_daytime_string() спать на 5 секунд, чтобы имитировать тяжелую работу, которую затем передаю на аутсорсинг через std::async. Затем я публикую write_async в конце std::async, чтобы write_async выполнялся позже в том же потоке(ах), что и другие обработчики завершения.

У меня есть 2 вопроса по поводу процедуры:

1.) Нужен ли вообще пост или разрешено вызывать async_write непосредственно из std::async? Есть ли здесь плюсы/минусы?

2.) Есть проблема с моим вариантом, а именно то, что dtor tcp_connection больше не вызывается и клиент больше не получает boost::asio::error::eof. Данные передаются клиенту корректно (25 байт). Только когда я завершаю работу сервера, клиент закрывается. Я подозреваю, что делаю что-то не так с файломshared_ptr?

Вот адаптированный код:

    #include <ctime>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    #include <boost/asio.hpp>

    using boost::asio::ip::tcp;

    std::future< void > futureSink;

    std::string make_daytime_string()
    {
        using namespace std;
        time_t now = time( 0 );
        std::this_thread::sleep_for( std::chrono::milliseconds( 5000 ) );
        return ctime( &now );
    }

    class tcp_connection : public std::enable_shared_from_this< tcp_connection >
    {
    public:
        typedef std::shared_ptr< tcp_connection > pointer;

        static pointer create(boost::asio::io_context& io_context)
        {
            return pointer( new tcp_connection( io_context ) );
        }

        tcp::socket& socket()
        {
            return socket_;
        }

        void start()
        {
            auto self = shared_from_this();

            futureSink = std::async( std::launch::async,
                [ self, this ]()
                {
                    message_ = make_daytime_string();

                    boost::asio::post( socket_.get_executor(),
                        [ self ]()
                        {
                            boost::asio::async_write( self->socket_,
                                                      boost::asio::buffer( self->message_ ),
                                                      std::bind( &tcp_connection::handle_write, self,
                                                                 boost::asio::placeholders::error,
                                                                 boost::asio::placeholders::bytes_transferred ) );
                        });
                });
        }
    private:
        tcp_connection( boost::asio::io_context& io_context ) : socket_( io_context )
        {}

        void handle_write(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
        {}

        tcp::socket socket_;
        std::string message_;
    };

    class tcp_server
    {
    public:
        tcp_server( boost::asio::io_context& io_context ) : io_context_( io_context ),
            acceptor_( io_context, tcp::endpoint( tcp::v4(), 13 ) )
        {
            start_accept();
        }

    private:
        void start_accept()
        {
            tcp_connection::pointer new_connection = tcp_connection::create( io_context_ );

            acceptor_.async_accept( new_connection->socket(),
                                    std::bind( &tcp_server::handle_accept, this, new_connection,
                                               boost::asio::placeholders::error ) );
        }

        void handle_accept( tcp_connection::pointer new_connection, const boost::system::error_code& error )
        {
            if ( !error )
                new_connection->start();

            start_accept();
        }

        boost::asio::io_context& io_context_;
        tcp::acceptor acceptor_;
    };

    int main()
    {
        try
        {
            boost::asio::io_context io_context;
            tcp_server server( io_context );
            io_context.run();
        }
        catch ( std::exception& e )
        {
            std::cerr << e.what() << std::endl;
        }

        return 0;
    }

Редактировать 07.04.2023 16:08

Я нашел ответ на вопрос 2. Здесь захваченный Shared_ptr не очищается, потому что я переместил std::future в глобальную переменную (выстрелил и забыл). Решение состоит в том, чтобы переместитьshared_ptr следующим образом:

void start()
{
    auto self = shared_from_this();

    futureSink = std::async( std::launch::async,
                            [ self, this ]() mutable
                            {
                                message_ = make_daytime_string();

                                boost::asio::post( socket_.get_executor(),
                                                  [ self = std::move( self ) ]()
                                                  {
                                                      boost::asio::async_write( self->socket_,
                                                                               boost::asio::buffer( self->message_ ),
                                                                               std::bind( &tcp_connection::handle_write, self,
                                                                                         boost::asio::placeholders::error,
                                                                                         boost::asio::placeholders::bytes_transferred ) );
                                                  });
                            });
} 
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
77
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  1. Да, пост нужен. Где бы std::async ни запускала лямбду, она по определению не находится в служебном потоке (который является неявной нитью).

  2. Действительно, это так. Если вы употребите futureSink, вы снова увидите запуск деструктора: http://coliru.stacked-crooked.com/a/47d92eea20083e0f


Где исправить?

Прежде всего, std::async не является асинхронным. Это строго менее надежно std::thread (в лучшем случае).

Ваше будущее — глобальное, и оно не может хорошо работать с одновременными соединениями.

Поскольку на самом деле вы просто хотите объединить работу в необслуживаемом потоке (чтобы не блокировать ввод-вывод), я бы просто /сделал это/.

Прямой эфир на Колиру

#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;

static asio::thread_pool pool(4); // how about some control over our threads

static std::string simulate_work() {
    std::this_thread::sleep_for(5s);
    return "Work simulation done";
}

class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
  public:
    using pointer = std::shared_ptr<tcp_connection>;

    static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }

    tcp::socket& socket() { return socket_; }

    void start() {
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
        auto self = shared_from_this();

        post(pool, [self, this]() {
            auto m = simulate_work();
            post(socket_.get_executor(),
                 [self, m = std::move(m)]() mutable { self->on_work_completed(std::move(m)); });
        });
    }

    ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }

  private:
    tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}

    void on_work_completed(std::string message) { // already on executor
        message_ = std::move(message);
        async_write(socket_, asio::buffer(message_),
                    bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
    }

    void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
        std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
    }

    tcp::socket socket_;
    std::string message_;
};

class tcp_server {
  public:
    tcp_server(asio::io_context& io_context)
        : io_context_(io_context)
        , acceptor_(io_context, {tcp::v4(), 1313}) {
        start_accept();
    }

  private:
    void start_accept() {
        tcp_connection::pointer new_connection = tcp_connection::create(io_context_);

        acceptor_.async_accept(new_connection->socket(),
                               bind(&tcp_server::handle_accept, this, new_connection, _1));
    }

    void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
        if (!error)
            new_connection->start();

        start_accept();
    }

    asio::io_context& io_context_;
    tcp::acceptor     acceptor_;
};

int main() {
    try {
        boost::asio::io_context io_context;

        tcp_server server(io_context);
        io_context.run();
        pool.join();
    } catch (std::exception const& e) {
        std::cerr << e.what() << std::endl;
    }
}

БОНУС/ФАНСИЯ

Если вы хотите немного пофантазировать, вы можете добавить функцию инициации в стиле Asio async_simulate_work:

Прямой эфир на Колиру

#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using asio::ip::tcp;

static std::string simulate_work() {
    std::this_thread::sleep_for(5s);
    return "Work simulation done";
}

template <typename Token> auto async_simulate_work(Token&& token) {
    return asio::async_initiate<Token, void(std::string)>( //
        [](auto handler) {
            std::thread(                             //
                [h = std::move(handler)]() mutable { //
                    std::move(h)(simulate_work());
                })
                .detach();
        },
        token);
}

class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
  public:
    using pointer = std::shared_ptr<tcp_connection>;

    static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }

    tcp::socket& socket() { return socket_; }

    void start() {
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
        auto self = shared_from_this();

        async_simulate_work(bind_executor(socket_.get_executor(), [self, this](std::string message) {
            message_ = std::move(message);
            async_write(socket_, asio::buffer(message_),
                        bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
        }));
    }

    ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }

  private:
    tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}

    void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
        assert(socket_.get_executor().target<asio::io_context::executor_type>()->running_in_this_thread());
        std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
    }

    tcp::socket socket_;
    std::string message_;
};

class tcp_server {
  public:
    tcp_server(asio::io_context& io_context)
        : io_context_(io_context)
        , acceptor_(io_context, {tcp::v4(), 1313}) {
        start_accept();
    }

  private:
    void start_accept() {
        tcp_connection::pointer new_connection = tcp_connection::create(io_context_);

        acceptor_.async_accept(new_connection->socket(),
                               bind(&tcp_server::handle_accept, this, new_connection, _1));
    }

    void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
        if (!error)
            new_connection->start();

        start_accept();
    }

    asio::io_context& io_context_;
    tcp::acceptor     acceptor_;
};

int main() {
    try {
        boost::asio::io_context io_context;

        tcp_server server(io_context);
        io_context.run();
    } catch (std::exception const& e) {
        std::cerr << e.what() << std::endl;
    }
}

Еще одна локальная демонстрация:

Существует ряд других запахов кода (могу ли я обнаружить подсказки из конкретной книги Packt?). Я бы упростил и модернизировал следующим образом: coliru.stacked-crooked.com/a/e43e7e17e5965399. Примерно половина исходного размера кода

sehe 07.04.2024 18:46

Хорошо, async_write МОЖЕТ выполняться ТОЛЬКО в потоках обслуживания или его можно выполнять откуда угодно?

SoulfreezerXP 08.04.2024 08:35

Если я правильно понимаю пример 1, то "работа" теперь выполняется в собственном пуле потоков и не влияет на io_context.run()? Должен ли/может ли io_context.run() работать в собственном пуле потоков?

SoulfreezerXP 08.04.2024 08:46

У меня также есть тематически актуальный вопрос, касающийся конвейерной обработки HTTP. Могу ли я здесь также выполнять параллельно, сохраняя порядок ответа (очередь ответов с фьючерсами)? stackoverflow.com/questions/78314437/…

SoulfreezerXP 15.04.2024 07:20

Другие вопросы по теме