Попытка подключения с таймаутом как составная операция с использованием ASIO

Я написал класс, который пытается установить соединение с TCP-сервером с настраиваемым таймаутом и количеством попыток. Это вызываемый объект, который возвращает std::future для результата. Проблемы с моей первоначальной реализацией:

  • объект должен быть постоянным до тех пор, пока либо не будет установлено соединение, либо пока не закончатся попытки, либо пока не возникнет ошибка Stop case. Поэтому я должен хранить его в своем классе, чего я надеюсь избежать.
  • Скомпонованные операции asio предоставляют средства для настройки потока управления при возврате: CompletionToken может быть простым обратным вызовом, может использоваться future или сопрограмма. В моем случае я привязал пользователя к будущему.

Это моя первоначальная реализация попытки подключения с настраиваемым таймаутом и количеством попыток:

    template<typename Connection>
    class connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        template<typename Endpoint>
        using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        explicit connection_attempt(Connection &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        explicit connection_attempt(Connection &connection,
                                    Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint &&endpoint,
                                     size_t attempts,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         attempts,
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

        // default attempts = infinite_attempts
        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint endpoint,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         infinite_attempts(),
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

    private:
        connection_type &connection_;
        asio::steady_timer timer_
                {connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};

        std::function<bool(const asio::error_code &)> stopOnError_;
        std::promise<bool> connectionResult_;

        // cancels the connection on timeout!
        template<typename Duration>
        void startTimer(const Duration &timeout)
        {
            timer_.expires_after(timeout); // it will automatically cancel a pending timer

            timer_.async_wait(
                    [this, timeout](const asio::error_code &errorCode)
                    {
                        // will occur on connection error before timeout
                        if (errorCode == asio::error::operation_aborted)
                            return;

                        // TODO: handle timer errors? What are the possible errors?
                        assert(!errorCode && "unexpected timer error!");

                        // stop current connection attempt
                        connection_.cancel();
                    });
        }

        void stopTimer()
        {
            timer_.cancel();
        }

        /**
         * Will be trying to connect until:<br>
         * - has run out of attempts
         * - has been required to stop by stopOnError callback (if it was set)
         * @param endpoint
         * @param attempts
         */
        template<typename Duration>
        void asyncConnect(endpoint_type endpoint,
                          size_t attempts,
                          Duration &&timeout)
        {
            startTimer(timeout);

            connection_.async_connect(endpoint, [this,
                    endpoint,
                    attempts,
                    timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
            {
                if (!errorCode)
                {
                    stopTimer();
                    connectionResult_.set_value(true);
                    return;
                }
                
                const auto attemptsLeft = attempts == infinite_attempts() ?
                                          infinite_attempts() :
                                          attempts - 1;

                if ((stopOnError_ &&
                     stopOnError_(errorCode == asio::error::operation_aborted ?
                                  // special case for operation_aborted on timer expiration - need to send timed_out explicitly
                                  // this should only be resulted from the timer calling cancel()
                                  asio::error::timed_out :
                                  errorCode)) ||
                    !attemptsLeft)
                {
                    stopTimer();
                    connectionResult_.set_value(false);
                    return;
                }

                asyncConnect(endpoint,
                             attemptsLeft,
                             timeout);
            });
        }
    };

    // this should be an asynchornous function with a custom CompletionToken
    template<typename Connection,
            typename Callable>
    auto make_connection_attempt(Connection &connection,
                                 Callable &&stopOnError) -> connection_attempt<Connection>
    {
        return connection_attempt<Connection>(connection,
                                              std::forward<Callable>(stopOnError));
    }

Однако я хочу быть последовательным при использовании ASIO, и поток управления Универсальная модель для асинхронных операций: при возврате должен быть настраиваемым. Я использовал пример для отправки нескольких сообщений с интервалами с использованием составной операции с промежуточным обработчиком с сохранением состояния. Обработчик рекурсивно передает себя как обработчик для каждой следующей асинхронной операции: async_wait и async_write. Эти вызовы всегда выполняются по очереди: один всегда вызывается, когда другой возвращается. Однако в моем случае async_wait и async_connect вызываются одновременно:

// initiation method, called first
void operator()(args...)
{
    // not valid!
    timer.async_wait(std::move(*this)); // from now on this is invalid
    connection.async_connect(endpoint, std::move(*this)); can't move this twice
}

Это код для класса, который я пытаюсь реализовать как инициирующий и промежуточный обработчик:

template<typename Connection, typename CompletionToken>
    class composed_connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        enum class state
        {
            pending,
            connected,
            timeout
        };

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        // TODO: executor type
        using executor_type = asio::associated_executor_t<CompletionToken,
                typename connection_type::executor_type>;

        executor_type get_executor() const noexcept
        {
            // TODO: get completion handler executor
            return connection_.get_executor();
        }

        // TODO: allocator type
        using allocator_type = typename asio::associated_allocator_t<CompletionToken,
                std::allocator<void>>;

        allocator_type get_allocator() const noexcept
        {
            // TODO: get completion handler allocator
            return allocator_type();
        }

        // TODO: constructor to initialize state, pass timeout value?
        explicit composed_connection_attempt(connection_type &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        // operator for initiation
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        size_t attempts,
                        Duration timeout = default_timeout())
        {
            // Start timer: how to pass this
            // Attempt connection
        }

        // intermediate completion handler
        // this may be invoked without an error both by the timer and a connection
        void operator()(const asio::error_code &errorCode)
        {
            if (!errorCode)
            {

            }
        }


    private:
        Connection &connection_;
        asio::steady_timer timer_{this->get_executor()};
        std::atomic<state> state_{state::pending};
        std::function<bool(const asio::error_code &)> stopOnError_;
        std::function<void(const asio::error_code &)> completionHandler_;
    };

Итак, проблемы, которые я пытаюсь решить:

  1. Как разделить владение промежуточным обработчиком с отслеживанием состояния как с таймером, так и с соединением (сокетом)? Может быть, мне нужно использовать вложенные классы (основной класс для инициации и вложенный для событий таймера и сокета)?
  2. Как определить, какой из асинхронных вызовов привел к вызову void operator()(const asio::error_code&)? Никакая ошибка не может быть результатом успешного подключения или тайм-аута. Обе асинхронные операции также могут возвращать asio::error::operation_aborted при отмене: попытка подключения отменяется по истечении времени ожидания, таймер отменяется в случае успеха или при ошибке подключения.

По второму вопросу: вы можете привязать параметр состояния к обработчику завершения.

sehe 05.04.2021 18:17

@sehe Я реализовал класс так, как мне было нужно. Я отправлю это как ответ позже.

Sergey Kolesnik 05.04.2021 18:29
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
2
69
3

Ответы 3

Итак, для второго вопроса я предложил отличительный аргумент (иногда я использую пустую «структуру состояния», например State::Init{} или State::Timeout{}, чтобы помочь в разрешении перегрузки, а также в самодокументировании).

Что касается первого вопроса, я уверен, что с тех пор вы могли столкнуться с std::enable_shared_from_this.

Вот мой взгляд на «Универсальную модель». Я использовал spawn для удобства экспозиции.

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
                              F&& stopOn, Token&& token,
                              int attempts = -1,
                              Timer::duration delay = 3s)
{
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    asio::spawn(
        object.get_executor(),
        [=, &object](asio::yield_context yc) mutable {
            using mylib::result_code;
            auto ex = get_associated_executor(yc);
            error_code ec;

            while (attempts--) {
                Timer t(ex, delay);
                t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });

                object.async_connect(ep, yc[ec]);

                if (!ec)
                    return completion(result_code::ok, true);

                if (ec == asio::error::operation_aborted) {
                    ec = result_code::timeout;
                }

                if (stopOn && stopOn(ec))
                    return completion(ec, false);

                object.close();
            }

            return completion(result_code::attempts_exceeded, false);
        });

    return result.get();
}

Ключевые моменты, на которые следует обратить внимание:

  • протокол async_result<> предоставит вам обработчик завершения, который «творит чудеса», требуемый вызывающей стороной (use_future, yield_context и т. д.)
  • у вас должна быть возможность обойтись без совместного использования ссылки, поскольку таймер может «просто» иметь необработанный указатель: время жизни таймера полностью принадлежит содержащейся составной операции.

Полная демонстрация: обратные вызовы, сопрограммы и фьючерсы

Я добавил перечисление mylib::result_code, чтобы иметь возможность возвращать полную информацию об ошибке:

Жить на Wandbox

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>

#ifdef STANDALONE_ASIO
    using std::error_category;
    using std::error_code;
    using std::error_condition;
    using std::system_error;
#else
    namespace asio = boost::asio;
    using boost::system::error_category;
    using boost::system::error_code;
    using boost::system::error_condition;
    using boost::system::system_error;
#endif

using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;

namespace mylib { // threw in the kitchen sink for error codes
    enum class result_code {
        ok                = 0,
        timeout           = 1,
        attempts_exceeded = 2,
    };

    auto const& get_result_category() {
        struct impl : error_category {
            const char* name() const noexcept override { return "result_code"; }
            std::string message(int ev) const override {
                switch (static_cast<result_code>(ev)) {
                case result_code::ok: return "success";
                case result_code::attempts_exceeded:
                    return "the maximum number of attempts was exceeded";
                case result_code::timeout:
                    return "the operation did not complete in time";
                default: return "unknown error";
                }
            }
            error_condition
            default_error_condition(int ev) const noexcept override {
                return error_condition{ev, *this};
            }
            bool equivalent(int ev, error_condition const& condition)
                const noexcept override {
                return condition.value() == ev && &condition.category() == this;
            }
            bool equivalent(error_code const& error,
                            int ev) const noexcept override {
                return error.value() == ev && &error.category() == this;
            }
        } const static instance;
        return instance;
    }

    error_code make_error_code(result_code se) {
        return error_code{
            static_cast<std::underlying_type<result_code>::type>(se),
            get_result_category()};
    }
} // namespace mylib

template <>
struct boost::system::is_error_code_enum<mylib::result_code>
    : std::true_type {};

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
                              F&& stopOn, Token&& token,
                              int attempts = -1,
                              Timer::duration delay = 3s)
{
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    asio::spawn(
        object.get_executor(),
        [=, &object](asio::yield_context yc) mutable {
            using mylib::result_code;
            auto ex = get_associated_executor(yc);
            error_code ec;

            while (attempts--) {
                Timer t(ex, delay);
                t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });

                object.async_connect(ep, yc[ec]);

                if (!ec)
                    return completion(result_code::ok, true);

                if (ec == asio::error::operation_aborted) {
                    ec = result_code::timeout;
                }

                if (stopOn && stopOn(ec))
                    return completion(ec, false);

                object.close();
            }

            return completion(result_code::attempts_exceeded, false);
        });

    return result.get();
}

static auto non_recoverable = [](error_code ec) {
    std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
    // TODO Be specific about intermittent/recoverable conditions
    return false;
};

#include <set>
int main(int argc, char** argv) {
    assert(argc>1);
    static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
                                  8989};

    std::set<std::string_view> const options{argv+1, argv+argc};

    std::cout << std::boolalpha;

    if (options.contains("future")) {
        std::cout
            << "-----------------------\n"
            << " FUTURE DEMO\n"
            << "-----------------------" << std::endl;
        asio::thread_pool ctx;

        try {
            tcp::socket s(ctx);

            std::future<bool> ok = async_connection_attempt(
                    s, ep, non_recoverable, asio::use_future, 5, 800ms);

            std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
        } catch (system_error const& se) {
            std::cout << "Future: " << se.code().message() << "\n";
        }

        ctx.join();
    }

    if (options.contains("coroutine")) {
        std::cout
            << "-----------------------\n"
            << " COROUTINE DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;

        asio::spawn(ctx,
            [work = make_work_guard(ctx)](asio::yield_context yc) {
                auto ex = get_associated_executor(yc);
                tcp::socket s(ex);

                error_code ec;
                if (async_connection_attempt(s, ep, non_recoverable,
                                             yc[ec], 5, 800ms)) {
                    std::cout << "Connected in coro\n";
                } else {
                    std::cout << "NOT Connected in coro: " << ec.message() << "\n";
                }
            });

        ctx.run();
    }

    if (options.contains("callback")) {
        std::cout
            << "-----------------------\n"
            << " CALLBACK DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;
        tcp::socket s(ctx);

        async_connection_attempt(
            s, ep, non_recoverable,
            [](error_code ec, bool ok) {
                std::cout << "Callback: " << ok << ", "
                          << ec.message() << "\n";
            },
            5, 800ms);

        ctx.run();
    }
}

Пример вывода находится в онлайн-компиляторе или сравните несколько тестов на моей машине:

Ваш ответ пролил свет на некоторые вопросы, связанные с asio, которые я имел в виду, но не поднимал. Я также не использовал сопрограммы в своем решении, так что это действительно очень поучительно.

Sergey Kolesnik 06.04.2021 12:03

Ух ты. Я только что создал то же самое без использования spawn (используя тип операции, который использует аргументы структуры State, как я уже упоминал). Я должен сказать о сложности, если этот вид библиотечных материалов меня продолжает удивлять. Тем не менее, мне удалось избежать накладных расходов, связанных с shared_from_this, и, конечно же, все демонстрации по-прежнему проходят, так что я вполне доволен. Если хотите, я могу опубликовать альтернативный ответ.

sehe 06.04.2021 18:54

Вы, конечно, можете. Это компенсирует отсутствие документации и примеров, которые очень трудно понять. Буду также признателен, если вы прокомментируете мой ответ. Я реализовал класс, который сам вызывается асинхронно без необходимости в async_initiate.

Sergey Kolesnik 06.04.2021 19:13

Честно говоря, мне так и не удалось разобраться в async_initiate - кажется, на 100% отсутствует документация по этому поводу, хотя я интуитивно знаю, что он должен быть очень близок к моей текущей реализации. Я сначала отправлю это, а затем прочту ваш ответ.

sehe 06.04.2021 20:36

из моих экспериментов я обнаружил, что async_initiate - это просто «удобная» оболочка для облегчения вызова asio::async_result<CompletionToken, Signature>::initiate(). Объект Initiation - это вызываемый объект, который принимает выведенный CompletionHandler и вариативный пакет аргументов, необходимых для его вызова. Отсюда лямбда в примерах [](auto &&hanlder, args...).

Sergey Kolesnik 06.04.2021 23:46

Да, но asio::async_result<CompletionToken, Signature>::initiate() так же кажется недокументированным. Я, вероятно, когда-нибудь обновлюсь по нему, но сейчас кажется, что маршрут async_result<> просто более переносим и в любом случае эквивалентен :)

sehe 06.04.2021 23:49

Несомненно, это так, и это легче понять. Я просто изучаю способ связать составные операции с помощью пользовательского связанного токена завершения. Думаю выложу отдельный вопрос по теме

Sergey Kolesnik 06.04.2021 23:51

Позвольте нам продолжить обсуждение в чате.

sehe 06.04.2021 23:56

Вот как я это реализовал. Код с тестами можно найти здесь на github

template<typename Connection, typename CompletionHandler>
    class composed_connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        // TODO: clarify the type!
        using completion_handler_t = CompletionHandler;

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        using executor_type = asio::associated_executor_t<
                typename std::decay<CompletionHandler>::type,
                typename connection_type::executor_type>;

        executor_type get_executor() const noexcept
        {
            // TODO: get completion handler executor
            return pImpl_->get_executor();
        }

        // TODO: allocator type
        using allocator_type = typename asio::associated_allocator_t<CompletionHandler,
                std::allocator<void>>;

        allocator_type get_allocator() const noexcept
        {
            // TODO: get completion handler allocator
            return pImpl_->get_allocator();
        }

        // TODO: constructor to initialize state, pass timeout value?
        template<typename CompletionHandlerT>
        explicit composed_connection_attempt(connection_type &connection,
                                             CompletionHandlerT &&completionHandler)
                : pImpl_(std::make_shared<impl>(connection,
                                                std::forward<CompletionHandlerT>(completionHandler)))
        {}


        template<typename CompletionHandlerT,
                typename Callable>
        explicit composed_connection_attempt(connection_type &connection,
                                             CompletionHandlerT &&completionHandler,
                                             Callable &&stopOnError)
                : pImpl_(std::make_shared<impl>(connection,
                                                std::forward<CompletionHandlerT>(completionHandler),
                                                std::forward<Callable>(stopOnError)))
        {}

        /**
         * Initiation operator. Initiates composed connection procedure.
         * @tparam Endpoint type of endpoint
         * @tparam Duration type of timeout
         * @param endpoint endpoint to be used for connection
         * @param attempts number of attempts
         * @param timeout value to be used as a timeout between attempts
         */
        // TODO: require endpoint type
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        size_t attempts,
                        Duration &&timeout = default_timeout())
        {
            pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
            pImpl_->attempts_ = attempts;
            pImpl_->timeout_ = std::forward<Duration>(timeout);

            asyncConnect();
        }

        /**
         * Initiation operator. Initiates composed connection procedure. Connection attempts default to infinite.
         * @tparam Endpoint type of endpoint
         * @tparam Duration type of timeout
         * @param endpoint endpoint to be used for connection
         * @param timeout value to be used as a timeout between attempts
         */
        // TODO: require endpoint type
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        Duration &&timeout = default_timeout())
        {
            pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
            pImpl_->timeout_ = std::forward<Duration>(timeout);

            asyncConnect();
        }

        /**
         * Intermediate completion handler. Will be trying to connect until:<br>
         * - has connected<br>
         * - has run out of attempts<br>
         * - user-provided callback #impl::stopOnError_ interrupts execution when a specific connection error has occurred<br>
         * <br>Will be invoked only on connection events:<br>
         * - success<br>
         * - connection timeout or operation_cancelled in case if timer has expired<br>
         * - connection errors<br>
         * @param errorCode error code resulted from async_connect
         */
        void operator()(const asio::error_code &errorCode)
        {
            if (!errorCode)
            {
                stopTimer();
                pImpl_->completionHandler_(errorCode);
                return;
            }

            const auto attemptsLeft = pImpl_->attempts_ == infinite_attempts() ?
                                      infinite_attempts() :
                                      pImpl_->attempts_ - 1;

            if ((pImpl_->stopOnError_ &&
                 pImpl_->stopOnError_(errorCode == asio::error::operation_aborted ?
                                      // special case for operation_aborted on timer expiration - need to send timed_out explicitly
                                      // this should only be resulted from the timer calling cancel()
                                      asio::error::timed_out :
                                      errorCode)) ||
                !attemptsLeft)
            {
                stopTimer();
                pImpl_->completionHandler_(errorCode == asio::error::operation_aborted ?
                                           asio::error::timed_out :
                                           errorCode);
                return;
            }

            pImpl_->attempts_ = attemptsLeft;
            asyncConnect();
        }

    private:

        struct impl
        {
            template<typename CompletionHandlerT>
            impl(connection_type &connection,
                 CompletionHandlerT &&completionHandler)
                    : connection_(connection),
                      completionHandler_(std::forward<CompletionHandlerT>(completionHandler))
            {}

            template<typename CompletionHandlerT, typename Callable>
            impl(connection_type &connection,
                 CompletionHandlerT &&completionHandler,
                 Callable &&stopOnError)
                    : connection_(connection),
                      completionHandler_(std::forward<CompletionHandlerT>(completionHandler)),
                      stopOnError_(std::forward<Callable>(stopOnError))
            {}

            executor_type get_executor() const noexcept
            {
                return asio::get_associated_executor(completionHandler_,
                                                     connection_.get_executor());
            }

            allocator_type get_allocator() const noexcept
            {
                // TODO: get completion handler allocator
                return allocator_type();
            }

            connection_type &connection_;
            completion_handler_t completionHandler_;
            std::function<bool(const asio::error_code &)> stopOnError_;

            // this should be default constructable or should I pass it in the constructor?
            endpoint_type endpoint_;

            // TODO: make timer initialization from get_executor()
            asio::steady_timer timer_{connection_.get_executor()}; // this does not compile! -> {get_executor()};
            asio::steady_timer::duration timeout_ = default_timeout();
            size_t attempts_ = infinite_attempts();
        };

        // TODO: make unique?
        std::shared_ptr<impl> pImpl_;

        // cancels the connection on timeout!
        void startTimer()
        {
            pImpl_->timer_.expires_after(pImpl_->timeout_); // it will automatically cancel a pending timer
            pImpl_->timer_.async_wait(
                    [pImpl = pImpl_](const asio::error_code &errorCode)
                    {
                        // will occur on connection error before timeout
                        if (errorCode == asio::error::operation_aborted)
                            return;

                        // TODO: handle timer errors? What are the possible errors?
                        assert(!errorCode && "unexpected timer error!");

                        // stop attempts
                        pImpl->connection_.cancel();
                    });
        }

        void stopTimer()
        {
            pImpl_->timer_.cancel();
        }

        /**
         * Will be trying to connect until:<br>
         * - has run out of attempts
         * - has been required to stop by stopOnError callback (if it was set)
         * @param endpoint
         * @param attempts
         */
        void asyncConnect()
        {
            startTimer();
            pImpl_->connection_.async_connect(pImpl_->endpoint_, std::move(*this));
        }
    };

    template<typename Connection,
            typename CompletionHandler,
            typename Callable>
    auto make_composed_connection_attempt(Connection &connection,
                                          CompletionHandler &&completionHandler,
                                          Callable &&stopOnError) ->
    composed_connection_attempt<Connection, CompletionHandler>
    {
        return composed_connection_attempt<Connection, CompletionHandler>(connection,
                                                                          std::forward<CompletionHandler>(
                                                                                  completionHandler),
                                                                          std::forward<Callable>(stopOnError));
    }

    template<typename Connection,
            typename Endpoint,
            typename Duration,
            typename CompletionToken,
            typename Callable>
    auto async_connection_attempt(Connection &connection,
                                  Endpoint &&endpoint,
                                  size_t attempts,
                                  Duration &&timeout,
                                  CompletionToken &&completionToken,
                                  Callable &&stopOnError)
    {

        using result_t = asio::async_result<std::decay_t<CompletionToken>,
                void(asio::error_code)>;
        using completion_t = typename result_t::completion_handler_type;

        completion_t completion{std::forward<CompletionToken>(completionToken)};
        result_t result{completion};

        auto composedConnectionAttempt = make_composed_connection_attempt(connection,
                                                                          std::forward<completion_t>(completion),
                                                                          std::forward<Callable>(stopOnError));
        composedConnectionAttempt(std::forward<Endpoint>(endpoint),
                                  attempts,
                                  std::forward<Duration>(timeout));

        return result.get();
    }

Наконец-то дошли до этого:

Wow. I've just created the same without using spawn (using an operation type that uses the State struct arguments as I mentioned). I must say the complexity if this kind of library-implementor-stuff keeps surprising me. I managed to avoid the overhead of shared_from_this though, and of course all demos still pass, so I'm pretty content. If you want I can post as an alternative answer. – sehe yesterday

Функция инициации

Функция инициации примерно такая же, за исключением того, что она больше не использует spawn (это означает, что пользователю не нужно соглашаться на Boost Coroutine и Boost Context).

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
                              Token&& token, int attempts = -1,
                              Timer::duration delay = 3s) {
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
    // make an owning self, to be passed along a single async call chain
    auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
    (*self)(self);

    return result.get();
}

Теперь вы сразу заметите, что я использовал контейнер с уникальным владельцем (unique_ptr). Я попытался избежать динамического распределения, создав класс операции семантики значений, который инкапсулировал обработчик в режиме «только перемещение».

Однако операция также владеет объектом таймера, который должен быть стабильным по ссылкам для обратных вызовов. Так что переезд - это не вариант. Конечно, у нас все еще может быть тип операции с подвижным значением, такой как содержал, всего лишь один unique_ptr для _timer, но это те же накладные расходы и менее общий характер.

  • если мы добавим еще один объект ввода-вывода в состояние операции, нам потребуется более динамическое распределение

  • перемещение unique_ptr строго дешевле, чем объект состояния, кратный размеру

  • перемещение объекта, на который указывает this, внутри функций-членов подвержено ошибкам очень. Например, это вызовет неопределенное поведение:

    bind_executor(_ex, std::bind(std::move(*this), ...))
    

    Это потому, что _ex на самом деле является this->_ex, но оценка не упорядочена, поэтому this->_ex может быть оценен после перемещения.

    Мы не должны этого хотеть.

  • если мы реализуем другие асинхронные операции, мы можем использовать тот же шаблон.

Класс операции

Вы узнаете это по исходному коду. Я решил использовать свое собственное предложение для выбора перегрузок operator() путем отправки по типу «состояния» маркера:

struct Init {};
struct Attempt {};
struct Response {};

Чтобы помочь в привязке к себе, мы также передаем owning unique_ptr в качестве аргумента self:

using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
    Self _self;
    template <typename... Args>
    decltype(auto) operator()(Args&&... args) {
        return (*_self)(_self, std::forward<Args>(args)...);
    }
};

Из-за ограничений в std::bind мы не можем передавать фактические параметры rvalue, но

  • если мы будем осторожны, чтобы цепочка вызовов была строго последовательной, мы всегда можем перейти от self ровно один раз

  • из-за косвенного указания unique_ptr по-прежнему безопасно использовать this из тела метода после перемещения self

  • Теперь мы можем зафиксировать стабильное значение this в нашем обработчике завершения для _timer.async_wait! Пока мы гарантируем, что обработчик завершения не переживет время существования self, нам не нужно делить владение здесь.

    shared_ptr dependence averted!

Имея это в виду, я думаю, что в полной реализации есть несколько сюрпризов:

namespace mylib { // implementation details

    template <typename F, typename Completion> struct connection_attempt_op {
        tcp::socket&    _object;
        tcp::endpoint   _ep;
        F               _stopOn;
        Completion      _handler;
        int             _attempts;
        Timer::duration _delay;

        using executor_type =
            asio::strand<std::decay_t<decltype(_object.get_executor())>>;
        executor_type          _ex;
        std::unique_ptr<Timer> _timer;

        executor_type const& get_executor() { return _ex; }

        explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
                                       F stopOn, Completion handler,
                                       int attempts, Timer::duration delay)
            : _object(object),
              _ep(ep),
              _stopOn(std::move(stopOn)),
              _handler(std::move(handler)),
              _attempts(attempts),
              _delay(delay),
              _ex(object.get_executor()) {}

        struct Init {};
        struct Attempt {};
        struct Response {};

        using Self = std::unique_ptr<connection_attempt_op>;
        struct Binder {
            Self _self;
            template <typename... Args>
            decltype(auto) operator()(Args&&... args) {
                return (*_self)(_self, std::forward<Args>(args)...);
            }
        };

        void operator()(Self& self, Init = {}) {
            // This is the only invocation perhaps not yet on the strand, so
            // dispatch
            asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
        }

        void operator()(Self& self, Attempt) {
            if (_attempts--) {
                _timer = std::make_unique<Timer>(_ex, _delay);
                _timer->async_wait([this](error_code ec) {
                    if (!ec) _object.cancel();
                });
                _object.async_connect(
                    _ep,
                    asio::bind_executor(
                        _ex, // _object may not already have been on strand
                        std::bind(Binder{std::move(self)}, Response{},
                                  std::placeholders::_1)));
            } else {
                _handler(mylib::result_code::attempts_exceeded, false);
            }
        }

        void operator()(Self& self, Response, error_code ec) {
            if (!ec) {
                _timer.reset();
                return _handler(result_code::ok, true);
            }

            if (ec == asio::error::operation_aborted) {
                ec = result_code::timeout;
            }

            if (_stopOn && _stopOn(ec))
                return _handler(ec, false);

            _timer.reset();
            _object.close();

            operator()(self, Attempt{});
        }
    };
}

Do note the executor binding; the comment in the Init{} overload as well as with the bind_executor are relevant here.

The strand is essential to maintaining the lifetime guarantees that we needed w.r.t. the async_wait operation. In particular we need the handler ordering follow this

ДЕМО ВРЕМЯ

Остальной код на 100% идентичен другому ответу, поэтому давайте представим его без дальнейших комментариев:

Жить на Wandbox

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
//#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>

#ifdef STANDALONE_ASIO
    using std::error_category;
    using std::error_code;
    using std::error_condition;
    using std::system_error;
#else
    namespace asio = boost::asio;
    using boost::system::error_category;
    using boost::system::error_code;
    using boost::system::error_condition;
    using boost::system::system_error;
#endif

using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;

namespace mylib { // threw in the kitchen sink for error codes
    enum class result_code {
        ok                = 0,
        timeout           = 1,
        attempts_exceeded = 2,
        not_implemented   = 3,
    };

    auto const& get_result_category() {
        struct impl : error_category {
            const char* name() const noexcept override { return "result_code"; }
            std::string message(int ev) const override {
                switch (static_cast<result_code>(ev)) {
                case result_code::ok: return "success";
                case result_code::attempts_exceeded:
                    return "the maximum number of attempts was exceeded";
                case result_code::timeout:
                    return "the operation did not complete in time";
                case result_code::not_implemented:
                    return "feature not implemented";
                default: return "unknown error";
                }
            }
            error_condition
            default_error_condition(int ev) const noexcept override {
                return error_condition{ev, *this};
            }
            bool equivalent(int ev, error_condition const& condition)
                const noexcept override {
                return condition.value() == ev && &condition.category() == this;
            }
            bool equivalent(error_code const& error,
                            int ev) const noexcept override {
                return error.value() == ev && &error.category() == this;
            }
        } const static instance;
        return instance;
    }

    error_code make_error_code(result_code se) {
        return error_code{
            static_cast<std::underlying_type<result_code>::type>(se),
            get_result_category()};
    }

} // namespace mylib

template <>
struct boost::system::is_error_code_enum<mylib::result_code>
    : std::true_type {};

namespace mylib { // implementation details

    template <typename F, typename Completion> struct connection_attempt_op {
        tcp::socket&    _object;
        tcp::endpoint   _ep;
        F               _stopOn;
        Completion      _handler;
        int             _attempts;
        Timer::duration _delay;

        using executor_type =
            asio::strand<std::decay_t<decltype(_object.get_executor())>>;
        executor_type          _ex;
        std::unique_ptr<Timer> _timer;

        executor_type const& get_executor() { return _ex; }

        explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
                                       F stopOn, Completion handler,
                                       int attempts, Timer::duration delay)
            : _object(object),
              _ep(ep),
              _stopOn(std::move(stopOn)),
              _handler(std::move(handler)),
              _attempts(attempts),
              _delay(delay),
              _ex(object.get_executor()) {}

        struct Init {};
        struct Attempt {};
        struct Response {};

        using Self = std::unique_ptr<connection_attempt_op>;
        struct Binder {
            Self _self;
            template <typename... Args>
            decltype(auto) operator()(Args&&... args) {
                return (*_self)(_self, std::forward<Args>(args)...);
            }
        };

        void operator()(Self& self, Init = {}) {
            // This is the only invocation perhaps not yet on the strand, so
            // dispatch
            asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
        }

        void operator()(Self& self, Attempt) {
            if (_attempts--) {
                _timer = std::make_unique<Timer>(_ex, _delay);
                _timer->async_wait([this](error_code ec) {
                    if (!ec) _object.cancel();
                });
                _object.async_connect(
                    _ep,
                    asio::bind_executor(
                        _ex, // _object may not already have been on strand
                        std::bind(Binder{std::move(self)}, Response{},
                                  std::placeholders::_1)));
            } else {
                _handler(mylib::result_code::attempts_exceeded, false);
            }
        }

        void operator()(Self& self, Response, error_code ec) {
            if (!ec) {
                _timer.reset();
                return _handler(result_code::ok, true);
            }

            if (ec == asio::error::operation_aborted) {
                ec = result_code::timeout;
            }

            if (_stopOn && _stopOn(ec))
                return _handler(ec, false);

            _timer.reset();
            _object.close();

            operator()(self, Attempt{});
        }
    };
}

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
                              Token&& token, int attempts = -1,
                              Timer::duration delay = 3s) {
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
    // make an owning self, to be passed along a single async call chain
    auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
    (*self)(self);

    return result.get();
}

static auto non_recoverable = [](error_code ec) {
    std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
    // TODO Be specific about intermittent/recoverable conditions
    return false;
};

#include <set>
int main(int argc, char** argv) {
    assert(argc>1);
    static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
                                  8989};

    std::set<std::string_view> const options{argv+1, argv+argc};

    std::cout << std::boolalpha;

    if (options.contains("future")) {
        std::cout
            << "-----------------------\n"
            << " FUTURE DEMO\n"
            << "-----------------------" << std::endl;
        asio::thread_pool ctx;

        try {
            tcp::socket s(ctx);

            std::future<bool> ok = async_connection_attempt(
                    s, ep, non_recoverable, asio::use_future, 5, 800ms);

            std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
        } catch (system_error const& se) {
            std::cout << "Future: " << se.code().message() << "\n";
        }

        ctx.join();
    }

    if (options.contains("coroutine")) {
        std::cout
            << "-----------------------\n"
            << " COROUTINE DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;

        asio::spawn(ctx,
            [work = make_work_guard(ctx)](asio::yield_context yc) {
                auto ex = get_associated_executor(yc);
                tcp::socket s(ex);

                error_code ec;
                if (async_connection_attempt(s, ep, non_recoverable,
                                             yc[ec], 5, 800ms)) {
                    std::cout << "Connected in coro\n";
                } else {
                    std::cout << "NOT Connected in coro: " << ec.message() << "\n";
                }
            });

        ctx.run();
    }

    if (options.contains("callback")) {
        std::cout
            << "-----------------------\n"
            << " CALLBACK DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;
        tcp::socket s(ctx);

        async_connection_attempt(
            s, ep, non_recoverable,
            [](error_code ec, bool ok) {
                std::cout << "Callback: " << ok << ", "
                          << ec.message() << "\n";
            },
            5, 800ms);

        ctx.run();
    }
}

Вот еще одна локальная демонстрация с разными сценариями:

Я видел примеры, в которых используется shared_from_this, захваченный лямбдой, что обеспечивает время жизни. Мне нравится идея использовать теги для перегрузки операторов. Это кажется более чистым, чем использование определенных функций-членов. Однако у меня нет идеи передавать Self вместо простого захвата this. this будет действителен, поскольку объект выделен в куче и останется действительным.

Sergey Kolesnik 09.04.2021 12:16

@SergeyKolesnik, кому принадлежит куча?

sehe 09.04.2021 16:21

Создателем является функция-оболочка async_connection_attempt. Затем можно вызвать shared_from_this из функции.

Sergey Kolesnik 09.04.2021 16:28

Да, с shared_from_this заморачиваться не надо. Вы можете легко привязать копии общего ptr.

sehe 09.04.2021 16:32

Но я не собирался в первую очередь соглашаться с накладными расходами совместного владения (реф-подсчет, копирование, стирание типа удаления, синхронизация и дополнительное выделение). Итак, я имитировал uniqe_from_this, используя параметр привязки Self. Преимущество перед просто мошенническим "delete this" - это исключительная безопасность. Уникальный_ptr имеет только 1 размер указателя, и единственные накладные расходы - время компиляции.

sehe 09.04.2021 16:33

(Кстати, просто захвата this недостаточно даже с shared_ptr, но я предполагаю, что вы знали и просто замалчивали это в своем комментарии)

sehe 09.04.2021 16:34

да, я имел ввиду [this, keepAlive = this->shared_from_this()]. Я подумал об использовании unique_ptr; вам просто нужно получить указатель на базовый объект, а затем безопасно переместить его. Однако я не уверен, что это будет безопасно. Возьмем, к примеру, сценарий, когда вы успешно завершили операцию, а затем активировали отмену таймера. К тому времени, когда таймер получит ошибку aborted, this, возможно, уже будет уничтожен.

Sergey Kolesnik 09.04.2021 16:41

Другие вопросы по теме