Я написал класс, который пытается установить соединение с TCP-сервером с настраиваемым таймаутом и количеством попыток. Это вызываемый объект, который возвращает std::future для результата.
Проблемы с моей первоначальной реализацией:
Это моя первоначальная реализация попытки подключения с настраиваемым таймаутом и количеством попыток:
template<typename Connection>
class connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
template<typename Endpoint>
using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
explicit connection_attempt(Connection &connection)
: connection_(connection)
{}
template<typename Callable>
explicit connection_attempt(Connection &connection,
Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
// default attempts = infinite_attempts
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint endpoint,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
infinite_attempts(),
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
private:
connection_type &connection_;
asio::steady_timer timer_
{connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};
std::function<bool(const asio::error_code &)> stopOnError_;
std::promise<bool> connectionResult_;
// cancels the connection on timeout!
template<typename Duration>
void startTimer(const Duration &timeout)
{
timer_.expires_after(timeout); // it will automatically cancel a pending timer
timer_.async_wait(
[this, timeout](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop current connection attempt
connection_.cancel();
});
}
void stopTimer()
{
timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
template<typename Duration>
void asyncConnect(endpoint_type endpoint,
size_t attempts,
Duration &&timeout)
{
startTimer(timeout);
connection_.async_connect(endpoint, [this,
endpoint,
attempts,
timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
connectionResult_.set_value(true);
return;
}
const auto attemptsLeft = attempts == infinite_attempts() ?
infinite_attempts() :
attempts - 1;
if ((stopOnError_ &&
stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
connectionResult_.set_value(false);
return;
}
asyncConnect(endpoint,
attemptsLeft,
timeout);
});
}
};
// this should be an asynchornous function with a custom CompletionToken
template<typename Connection,
typename Callable>
auto make_connection_attempt(Connection &connection,
Callable &&stopOnError) -> connection_attempt<Connection>
{
return connection_attempt<Connection>(connection,
std::forward<Callable>(stopOnError));
}
Однако я хочу быть последовательным при использовании ASIO, и поток управления Универсальная модель для асинхронных операций: при возврате должен быть настраиваемым.
Я использовал пример для отправки нескольких сообщений с интервалами с использованием составной операции с промежуточным обработчиком с сохранением состояния. Обработчик рекурсивно передает себя как обработчик для каждой следующей асинхронной операции: async_wait и async_write. Эти вызовы всегда выполняются по очереди: один всегда вызывается, когда другой возвращается. Однако в моем случае async_wait и async_connect вызываются одновременно:
// initiation method, called first
void operator()(args...)
{
// not valid!
timer.async_wait(std::move(*this)); // from now on this is invalid
connection.async_connect(endpoint, std::move(*this)); can't move this twice
}
Это код для класса, который я пытаюсь реализовать как инициирующий и промежуточный обработчик:
template<typename Connection, typename CompletionToken>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
enum class state
{
pending,
connected,
timeout
};
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
// TODO: executor type
using executor_type = asio::associated_executor_t<CompletionToken,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return connection_.get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionToken,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
// TODO: constructor to initialize state, pass timeout value?
explicit composed_connection_attempt(connection_type &connection)
: connection_(connection)
{}
template<typename Callable>
composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
// operator for initiation
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration timeout = default_timeout())
{
// Start timer: how to pass this
// Attempt connection
}
// intermediate completion handler
// this may be invoked without an error both by the timer and a connection
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
}
}
private:
Connection &connection_;
asio::steady_timer timer_{this->get_executor()};
std::atomic<state> state_{state::pending};
std::function<bool(const asio::error_code &)> stopOnError_;
std::function<void(const asio::error_code &)> completionHandler_;
};
Итак, проблемы, которые я пытаюсь решить:
void operator()(const asio::error_code&)? Никакая ошибка не может быть результатом успешного подключения или тайм-аута. Обе асинхронные операции также могут возвращать asio::error::operation_aborted при отмене: попытка подключения отменяется по истечении времени ожидания, таймер отменяется в случае успеха или при ошибке подключения.@sehe Я реализовал класс так, как мне было нужно. Я отправлю это как ответ позже.





Итак, для второго вопроса я предложил отличительный аргумент (иногда я использую пустую «структуру состояния», например State::Init{} или State::Timeout{}, чтобы помочь в разрешении перегрузки, а также в самодокументировании).
Что касается первого вопроса, я уверен, что с тех пор вы могли столкнуться с std::enable_shared_from_this.
Вот мой взгляд на «Универсальную модель». Я использовал spawn для удобства экспозиции.
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if (!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
Ключевые моменты, на которые следует обратить внимание:
async_result<> предоставит вам обработчик завершения, который «творит чудеса», требуемый вызывающей стороной (use_future, yield_context и т. д.)Я добавил перечисление mylib::result_code, чтобы иметь возможность возвращать полную информацию об ошибке:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if (!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
Пример вывода находится в онлайн-компиляторе или сравните несколько тестов на моей машине:
Ваш ответ пролил свет на некоторые вопросы, связанные с asio, которые я имел в виду, но не поднимал. Я также не использовал сопрограммы в своем решении, так что это действительно очень поучительно.
Ух ты. Я только что создал то же самое без использования spawn (используя тип операции, который использует аргументы структуры State, как я уже упоминал). Я должен сказать о сложности, если этот вид библиотечных материалов меня продолжает удивлять. Тем не менее, мне удалось избежать накладных расходов, связанных с shared_from_this, и, конечно же, все демонстрации по-прежнему проходят, так что я вполне доволен. Если хотите, я могу опубликовать альтернативный ответ.
Вы, конечно, можете. Это компенсирует отсутствие документации и примеров, которые очень трудно понять. Буду также признателен, если вы прокомментируете мой ответ. Я реализовал класс, который сам вызывается асинхронно без необходимости в async_initiate.
Честно говоря, мне так и не удалось разобраться в async_initiate - кажется, на 100% отсутствует документация по этому поводу, хотя я интуитивно знаю, что он должен быть очень близок к моей текущей реализации. Я сначала отправлю это, а затем прочту ваш ответ.
из моих экспериментов я обнаружил, что async_initiate - это просто «удобная» оболочка для облегчения вызова asio::async_result<CompletionToken, Signature>::initiate(). Объект Initiation - это вызываемый объект, который принимает выведенный CompletionHandler и вариативный пакет аргументов, необходимых для его вызова. Отсюда лямбда в примерах [](auto &&hanlder, args...).
Да, но asio::async_result<CompletionToken, Signature>::initiate() так же кажется недокументированным. Я, вероятно, когда-нибудь обновлюсь по нему, но сейчас кажется, что маршрут async_result<> просто более переносим и в любом случае эквивалентен :)
Несомненно, это так, и это легче понять. Я просто изучаю способ связать составные операции с помощью пользовательского связанного токена завершения. Думаю выложу отдельный вопрос по теме
Позвольте нам продолжить обсуждение в чате.
Вот как я это реализовал. Код с тестами можно найти здесь на github
template<typename Connection, typename CompletionHandler>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
// TODO: clarify the type!
using completion_handler_t = CompletionHandler;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
using executor_type = asio::associated_executor_t<
typename std::decay<CompletionHandler>::type,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return pImpl_->get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionHandler,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return pImpl_->get_allocator();
}
// TODO: constructor to initialize state, pass timeout value?
template<typename CompletionHandlerT>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler)))
{}
template<typename CompletionHandlerT,
typename Callable>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler),
std::forward<Callable>(stopOnError)))
{}
/**
* Initiation operator. Initiates composed connection procedure.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param attempts number of attempts
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->attempts_ = attempts;
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Initiation operator. Initiates composed connection procedure. Connection attempts default to infinite.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Intermediate completion handler. Will be trying to connect until:<br>
* - has connected<br>
* - has run out of attempts<br>
* - user-provided callback #impl::stopOnError_ interrupts execution when a specific connection error has occurred<br>
* <br>Will be invoked only on connection events:<br>
* - success<br>
* - connection timeout or operation_cancelled in case if timer has expired<br>
* - connection errors<br>
* @param errorCode error code resulted from async_connect
*/
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
pImpl_->completionHandler_(errorCode);
return;
}
const auto attemptsLeft = pImpl_->attempts_ == infinite_attempts() ?
infinite_attempts() :
pImpl_->attempts_ - 1;
if ((pImpl_->stopOnError_ &&
pImpl_->stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
pImpl_->completionHandler_(errorCode == asio::error::operation_aborted ?
asio::error::timed_out :
errorCode);
return;
}
pImpl_->attempts_ = attemptsLeft;
asyncConnect();
}
private:
struct impl
{
template<typename CompletionHandlerT>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler))
{}
template<typename CompletionHandlerT, typename Callable>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler)),
stopOnError_(std::forward<Callable>(stopOnError))
{}
executor_type get_executor() const noexcept
{
return asio::get_associated_executor(completionHandler_,
connection_.get_executor());
}
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
connection_type &connection_;
completion_handler_t completionHandler_;
std::function<bool(const asio::error_code &)> stopOnError_;
// this should be default constructable or should I pass it in the constructor?
endpoint_type endpoint_;
// TODO: make timer initialization from get_executor()
asio::steady_timer timer_{connection_.get_executor()}; // this does not compile! -> {get_executor()};
asio::steady_timer::duration timeout_ = default_timeout();
size_t attempts_ = infinite_attempts();
};
// TODO: make unique?
std::shared_ptr<impl> pImpl_;
// cancels the connection on timeout!
void startTimer()
{
pImpl_->timer_.expires_after(pImpl_->timeout_); // it will automatically cancel a pending timer
pImpl_->timer_.async_wait(
[pImpl = pImpl_](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop attempts
pImpl->connection_.cancel();
});
}
void stopTimer()
{
pImpl_->timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
void asyncConnect()
{
startTimer();
pImpl_->connection_.async_connect(pImpl_->endpoint_, std::move(*this));
}
};
template<typename Connection,
typename CompletionHandler,
typename Callable>
auto make_composed_connection_attempt(Connection &connection,
CompletionHandler &&completionHandler,
Callable &&stopOnError) ->
composed_connection_attempt<Connection, CompletionHandler>
{
return composed_connection_attempt<Connection, CompletionHandler>(connection,
std::forward<CompletionHandler>(
completionHandler),
std::forward<Callable>(stopOnError));
}
template<typename Connection,
typename Endpoint,
typename Duration,
typename CompletionToken,
typename Callable>
auto async_connection_attempt(Connection &connection,
Endpoint &&endpoint,
size_t attempts,
Duration &&timeout,
CompletionToken &&completionToken,
Callable &&stopOnError)
{
using result_t = asio::async_result<std::decay_t<CompletionToken>,
void(asio::error_code)>;
using completion_t = typename result_t::completion_handler_type;
completion_t completion{std::forward<CompletionToken>(completionToken)};
result_t result{completion};
auto composedConnectionAttempt = make_composed_connection_attempt(connection,
std::forward<completion_t>(completion),
std::forward<Callable>(stopOnError));
composedConnectionAttempt(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return result.get();
}
Наконец-то дошли до этого:
Wow. I've just created the same without using spawn (using an operation type that uses the State struct arguments as I mentioned). I must say the complexity if this kind of library-implementor-stuff keeps surprising me. I managed to avoid the overhead of shared_from_this though, and of course all demos still pass, so I'm pretty content. If you want I can post as an alternative answer. – sehe yesterday
Функция инициации примерно такая же, за исключением того, что она больше не использует spawn (это означает, что пользователю не нужно соглашаться на Boost Coroutine и Boost Context).
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
Теперь вы сразу заметите, что я использовал контейнер с уникальным владельцем (unique_ptr). Я попытался избежать динамического распределения, создав класс операции семантики значений, который инкапсулировал обработчик в режиме «только перемещение».
Однако операция также владеет объектом таймера, который должен быть стабильным по ссылкам для обратных вызовов. Так что переезд - это не вариант. Конечно, у нас все еще может быть тип операции с подвижным значением, такой как содержал, всего лишь один unique_ptr для _timer, но это те же накладные расходы и менее общий характер.
если мы добавим еще один объект ввода-вывода в состояние операции, нам потребуется более динамическое распределение
перемещение unique_ptr строго дешевле, чем объект состояния, кратный размеру
перемещение объекта, на который указывает this, внутри функций-членов подвержено ошибкам очень. Например, это вызовет неопределенное поведение:
bind_executor(_ex, std::bind(std::move(*this), ...))
Это потому, что _ex на самом деле является this->_ex, но оценка не упорядочена, поэтому this->_ex может быть оценен после перемещения.
Мы не должны этого хотеть.
если мы реализуем другие асинхронные операции, мы можем использовать тот же шаблон.
Вы узнаете это по исходному коду. Я решил использовать свое собственное предложение для выбора перегрузок operator() путем отправки по типу «состояния» маркера:
struct Init {};
struct Attempt {};
struct Response {};
Чтобы помочь в привязке к себе, мы также передаем owning unique_ptr в качестве аргумента self:
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
Из-за ограничений в std::bind мы не можем передавать фактические параметры rvalue, но
если мы будем осторожны, чтобы цепочка вызовов была строго последовательной, мы всегда можем перейти от self ровно один раз
из-за косвенного указания unique_ptr по-прежнему безопасно использовать this из тела метода после перемещения self
Теперь мы можем зафиксировать стабильное значение this в нашем обработчике завершения для _timer.async_wait! Пока мы гарантируем, что обработчик завершения не переживет время существования self, нам не нужно делить владение здесь.
shared_ptrdependence averted!
Имея это в виду, я думаю, что в полной реализации есть несколько сюрпризов:
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
Do note the executor binding; the comment in the
Init{}overload as well as with thebind_executorare relevant here.The strand is essential to maintaining the lifetime guarantees that we needed w.r.t. the
async_waitoperation. In particular we need the handler ordering follow this
Остальной код на 100% идентичен другому ответу, поэтому давайте представим его без дальнейших комментариев:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
//#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
not_implemented = 3,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
case result_code::not_implemented:
return "feature not implemented";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
Вот еще одна локальная демонстрация с разными сценариями:
Я видел примеры, в которых используется shared_from_this, захваченный лямбдой, что обеспечивает время жизни. Мне нравится идея использовать теги для перегрузки операторов. Это кажется более чистым, чем использование определенных функций-членов. Однако у меня нет идеи передавать Self вместо простого захвата this. this будет действителен, поскольку объект выделен в куче и останется действительным.
@SergeyKolesnik, кому принадлежит куча?
Создателем является функция-оболочка async_connection_attempt. Затем можно вызвать shared_from_this из функции.
Да, с shared_from_this заморачиваться не надо. Вы можете легко привязать копии общего ptr.
Но я не собирался в первую очередь соглашаться с накладными расходами совместного владения (реф-подсчет, копирование, стирание типа удаления, синхронизация и дополнительное выделение). Итак, я имитировал uniqe_from_this, используя параметр привязки Self. Преимущество перед просто мошенническим "delete this" - это исключительная безопасность. Уникальный_ptr имеет только 1 размер указателя, и единственные накладные расходы - время компиляции.
(Кстати, просто захвата this недостаточно даже с shared_ptr, но я предполагаю, что вы знали и просто замалчивали это в своем комментарии)
да, я имел ввиду [this, keepAlive = this->shared_from_this()]. Я подумал об использовании unique_ptr; вам просто нужно получить указатель на базовый объект, а затем безопасно переместить его. Однако я не уверен, что это будет безопасно. Возьмем, к примеру, сценарий, когда вы успешно завершили операцию, а затем активировали отмену таймера. К тому времени, когда таймер получит ошибку aborted, this, возможно, уже будет уничтожен.
По второму вопросу: вы можете привязать параметр состояния к обработчику завершения.