Асинхронный tcp-сервер, использующий async_write от boost, приводит к плохому файловому дескриптору

Во-первых, я не являюсь носителем английского языка, поэтому я, вероятно, сделаю несколько грамматических ошибок, извините за это...

Я пытаюсь создать асинхронный TCP-сервер, используя C++ и Boost. Мне удалось принять клиентов и получить от них сообщения, но я не могу отвечать на их сообщения. Чего я хочу добиться, так это наличия метода в классе TCPServer, который отвечает всем подключенным клиентам. Я создал метод для этого, но когда я вызываю TCPServer::write, я получаю ошибку "Bad file descriptor" в аргументе ошибки TcpConnectionHandler::handle_write.

Не могли бы вы помочь мне понять, что я делаю неправильно?

tcp_server.h

#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>

namespace my_project
{
  class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
  {
    public:

      TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback);

      boost::asio::ip::tcp::socket& socket();

      void start();

      void write(const std::string& message);

    private:

      void writeImpl(const std::string& message);

      void write();

      void handle_read(const boost::system::error_code& error, size_t bytes_transferred);

      void handle_write(const boost::system::error_code& error, size_t bytes_transferred);

      boost::asio::ip::tcp::socket socket_;
      boost::asio::streambuf message_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;

      std::deque<std::string> outbox_;
      boost::asio::io_service& io_service_;
      boost::asio::io_service::strand strand_;
  };

  class TcpServer
  {
    public:

      TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback);
      ~TcpServer();

      void start();

      void write(std::string content);

    private:

      void start_accept();

      void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error);

      boost::shared_ptr<TcpConnectionHandler> connection_;
      boost::asio::io_service io_service_;
      boost::asio::ip::tcp::acceptor acceptor_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;
      boost::condition_variable connection_cond_;
      boost::mutex connection_mutex_;
      bool client_connected_;
      boost::thread *io_thread_;                     /**< Thread to run boost.asio io_service. */
  };

} // namespace my_project

#endif // #ifndef TCP_SERVER_

tcp_server.cpp

#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project
{
  // TcpConnectionHandler

  TcpConnectionHandler::TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback) : io_service_(io_service), strand_(io_service_), socket_(io_service), outbox_()
  {
    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;
  }

  boost::asio::ip::tcp::socket& TcpConnectionHandler::socket()
  {
    return socket_;
  }

  void TcpConnectionHandler::start()
  {
    async_read_until(socket_,
        message_,
        "\r\n",
        boost::bind(&TcpConnectionHandler::handle_read,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
  }

  void TcpConnectionHandler::write(const std::string& message)
  {
    strand_.post(boost::bind(&TcpConnectionHandler::writeImpl, this, message));
  }

  void TcpConnectionHandler::writeImpl(const std::string& message)
  {
    outbox_.push_back( message );
    if ( outbox_.size() > 1 ) {
        // outstanding async_write
        return;
    }

    this->write();
  }

  void TcpConnectionHandler::write()
  {
    const std::string& message = outbox_[0];
    boost::asio::async_write(
            socket_,
            boost::asio::buffer( message.c_str(), message.size() ),
            strand_.wrap(
                boost::bind(
                    &TcpConnectionHandler::handle_write,
                    this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred
                    )
                )
            );
  }

  void TcpConnectionHandler::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
  {
    // Check for client disconnection
    if ((boost::asio::error::eof == error) || (boost::asio::error::connection_reset == error))
    {
      //LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
      return;
    }

    // Convert stream to string
    std::istream stream(&message_);
    std::istreambuf_iterator<char> eos;
    std::string message_str(std::istreambuf_iterator<char>(stream), eos);

    //LOG(DEBUG) << log_prefix_ << " communication object received message: " << getPrintableMessage(message_str);

    std::istringstream iss(message_str);

    std::string msg;
    std::getline(iss, msg, '\r'); // Consumes from the streambuf.

    // Discard the rest of the message from buffer
    message_.consume(message_.size());

    if (!error)
    {
      received_message_callback_(msg);
      start();
    }
    else
    {
      // TODO: Handler here the error
    }
  }

  void TcpConnectionHandler::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
  {
    outbox_.pop_front();

    if ( error ) {
        std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
        return;
    }

    if ( !outbox_.empty() ) {
        // more messages to send
        this->write();
    }
  }


  // TcpServer

  TcpServer::TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback) : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), client_connected_(false), io_thread_(NULL)
  {

    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;

    start_accept();

    // Run io_service in secondary thread
    io_thread_ = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));

  }

  TcpServer::~TcpServer()
  {
      if (io_thread_)
      {
        io_service_.stop();
        io_thread_->interrupt();
        io_thread_->join();
        delete io_thread_;
      }
  }

  void TcpServer::start()
  {
    // Wait until client is connected to our TCP server. (condition variable)
    boost::unique_lock<boost::mutex> lock(connection_mutex_);

    while (!client_connected_)
    {
      //LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish connection...";

      connection_cond_.wait(lock);
    }

    //LOG(INFO) << log_prefix_ << " client successfully connected.";
  }

  void TcpServer::write(std::string content)
  {
    connection_->write(content);
  }

  void TcpServer::start_accept()
  {
    // Create a new connection handler
    connection_.reset(new TcpConnectionHandler(log_prefix_, acceptor_.get_io_service(), received_message_callback_));

    // Asynchronous accept operation and wait for a new connection.
    acceptor_.async_accept(connection_->socket(),
        boost::bind(&TcpServer::handle_accept, this, connection_,
        boost::asio::placeholders::error));

    //LOG(DEBUG) << log_prefix_ << " communication object started asynchronous TCP/IP connection acceptance.";
  }


  void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error)
  {
    if (!error)
    {
      //LOG(INFO) << log_prefix_ << " client connected!";
      connection->start();
      boost::mutex::scoped_lock lock(connection_mutex_);
      client_connected_ = true;
      connection_cond_.notify_one();
      //LOG(INFO) << log_prefix_ << " client connection accepted";
    }

    start_accept();
  }
}

Заранее спасибо!

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
528
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я немного изменил код , чтобы он был совместим с Boost 1.74.0 .

Затем я запустил его с ASAN:

=================================================================
==22695==ERROR: AddressSanitizer: heap-use-after-free on address 0x61b000000080 at pc 0x5571c3b61379 bp 0x7ffddce81980 sp 0x7ffddce81970
READ of size 8 at 0x61b000000080 thread T0
    #0 0x5571c3b61378 in boost::asio::detail::strand_executor_service::strand_impl::~strand_i...
    #1 0x5571c3c02599 in std::_Sp_counted_ptr<boost::asio::detail::strand_executor_service::s...
    #2 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr...
    #3 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /...
    #4 0x5571c3b5ff84 in std::__shared_ptr<boost::asio::detail::strand_executor_service::stra...
    #5 0x5571c3b5ffeb in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
    #6 0x5571c3b7d8d0 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #7 0x5571c3c0da6c in void std::destroy_at<boost::asio::strand<boost::asio::execution::any...
    #8 0x5571c3c0b761 in void std::allocator_traits<std::allocator<boost::asio::strand<boost:...
    #9 0x5571c3c01847 in std::_Sp_counted_ptr_inplace<boost::asio::strand<boost::asio::execut...
    #10 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /us...
    #11 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ...
    #12 0x5571c3b25690 in std::__shared_ptr<void, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(...
    #13 0x5571c3b256f7 in std::shared_ptr<void>::~shared_ptr() /usr/include/c++/10/bits/share...
    #14 0x5571c3b25769 in boost::asio::execution::detail::any_executor_base::destroy_shared(b...
    #15 0x5571c3b24ef2 in boost::asio::execution::detail::any_executor_base::~any_executor_ba...
    #16 0x5571c3b6a0f7 in boost::asio::execution::any_executor<boost::asio::execution::contex...
    #17 0x5571c3bbf447 in my_project::TcpConnectionHandler::~TcpConnectionHandler() /home/seh...
    #18 0x5571c3bbf4e1 in void boost::checked_delete<my_project::TcpConnectionHandler>(my_pro...
    #19 0x5571c3c020a9 in boost::detail::sp_counted_impl_p<my_project::TcpConnectionHandler>:...
    #20 0x5571c3b5bf6b in boost::detail::sp_counted_base::release() /home/sehe/custom/boost_1...
    #21 0x5571c3b5c537 in boost::detail::shared_count::~shared_count() /home/sehe/custom/boos...
    #22 0x5571c3b6a324 in boost::shared_ptr<my_project::TcpConnectionHandler>::~shared_ptr() ...
    #23 0x5571c3b1d48e in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverfl...
    #24 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #25 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
    #26 0x5571c3b18b99 in _start (/home/sehe/Projects/stackoverflow/sotest+0x182b99)

0x61b000000080 is located 0 bytes inside of 1640-byte region [0x61b000000080,0x61b0000006e8)
freed by thread T0 here:
    #0 0x7ff1f1eb4407 in operator delete(void*, unsigned long) (/usr/lib/x86_64-linux-gnu/lib...
    #1 0x5571c3c003f4 in boost::asio::detail::strand_executor_service::~strand_executor_servi...
    #2 0x5571c3b35e4a in boost::asio::detail::service_registry::destroy(boost::asio::executio...
    #3 0x5571c3b3578b in boost::asio::detail::service_registry::destroy_services() /home/sehe...
    #4 0x5571c3b37a4f in boost::asio::execution_context::destroy() /home/sehe/custom/boost_1_...
    #5 0x5571c3b3788a in boost::asio::execution_context::~execution_context() /home/sehe/cust...
    #6 0x5571c3b54621 in boost::asio::io_context::~io_context() /home/sehe/custom/boost_1_74_...
    #7 0x5571c3b1d43b in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverflo...
    #8 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #9 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

previously allocated by thread T0 here:
    #0 0x7ff1f1eb33a7 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.6+...
    #1 0x5571c3bc0faa in boost::asio::execution_context::service* boost::asio::detail::servic...
    #2 0x5571c3b3623a in boost::asio::detail::service_registry::do_use_service(boost::asio::e...
    #3 0x5571c3bb74f9 in boost::asio::detail::strand_executor_service& boost::asio::detail::s...
    #4 0x5571c3bab8e7 in boost::asio::detail::strand_executor_service& boost::asio::use_servi...
    #5 0x5571c3ba085f in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
    #6 0x5571c3b92744 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #7 0x5571c3b7d844 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #8 0x5571c3b18e8a in my_project::TcpConnectionHandler::TcpConnectionHandler(std::__cxx11:...
    #9 0x5571c3b1dbe4 in my_project::TcpServer::start_accept() /home/sehe/Projects/stackoverf...
    #10 0x5571c3b1c775 in my_project::TcpServer::TcpServer(std::__cxx11::basic_string<char, s...
    #11 0x5571c3b1e7b9 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #12 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

SUMMARY: AddressSanitizer: heap-use-after-free /home/sehe/custom/boost_1_74_0/boost/asio/detail/impl/strand_executor_service.ipp:88 in boost::asio::detail::strand_executor_service::strand_impl::~strand_impl()
Shadow bytes around the buggy address:
  0x0c367fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff8000: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
=>0x0c367fff8010:[fd]fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8020: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8030: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8040: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8050: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8060: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
Shadow byte legend (one shadow byte represents 8 application bytes):
  Addressable:           00
  Partially addressable: 01 02 03 04 05 06 07 
  Heap left redzone:       fa
  Freed heap region:       fd
  Stack left redzone:      f1
  Stack mid redzone:       f2
  Stack right redzone:     f3
  Stack after return:      f5
  Stack use after scope:   f8
  Global redzone:          f9
  Global init order:       f6
  Poisoned by user:        f7
  Container overflow:      fc
  Array cookie:            ac
  Intra object redzone:    bb
  ASan internal:           fe
  Left alloca redzone:     ca
  Right alloca redzone:    cb
  Shadow gap:              cc
==22695==ABORTING

Как видите, деструктор ~TcpServer заставляет соединения использовать io_service после его уничтожения. Изменение порядка участников исправит это:

boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;

Должно быть

boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;

Более

Чтобы на самом деле слушать, кажется, что вы хотели бы /дождаться/ выхода службы, а не stop() и interrupt() треда?

Также обратите внимание, что нет необходимости динамически выделять поток. Почему программисты на C++ должны свести к минимуму использование «нового»?

TcpServer::~TcpServer() {
    if (io_thread_.joinable()) {
        //io_service_.stop();
        //io_thread_.interrupt();
        io_thread_.join();
    }
}

Пожизненная проблема: shared_from_this?

boost::bind(&TcpConnectionHandler::handle_write, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)

Эта часть не может захватить общий указатель, что приводит к уничтожению TcpConnectionHandler. (Когда socket уничтожается, он закрывается). На самом деле, ваша проблема UB, потому что это use-after-free, но вам «повезло» не увидеть сбой, из-за которого вы видите UB в виде недопустимого дескриптора.

Исправьте их оба раза:

post(executor_,
     boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));

И

async_read_until(socket_, message_, "\r\n",
     boost::bind(&TcpConnectionHandler::handle_read,
                 shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred));

Живая демонстрация

Я успешно протестировал его со следующим:

#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace my_project {
class TcpConnectionHandler
        : public boost::enable_shared_from_this<TcpConnectionHandler> {
  public:
    TcpConnectionHandler(
        std::string log_prefix, boost::asio::any_io_executor executor,
        boost::function<void(std::string&)> received_message_callback);

    boost::asio::ip::tcp::socket& socket();

    void start();
    void write(const std::string& message);

  private:
    void writeImpl(const std::string& message);
    void write();
    void handle_read(const boost::system::error_code& error,
                     size_t bytes_transferred);
    void handle_write(const boost::system::error_code& error,
                      size_t bytes_transferred);

    boost::asio::any_io_executor executor_;
    boost::asio::ip::tcp::socket socket_;
    boost::asio::streambuf message_;
    std::string log_prefix_;
    boost::function<void(std::string&)> received_message_callback_;

    std::deque<std::string> outbox_;
};

class TcpServer {
  public:
    TcpServer(std::string log_prefix, unsigned int port,
              boost::function<void(std::string&)> received_message_callback);
    ~TcpServer();

    void start();

    void write(std::string content);

  private:
    void start_accept();

    void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection,
                       const boost::system::error_code& error);

    boost::asio::io_service io_service_;
    boost::shared_ptr<TcpConnectionHandler> connection_;
    boost::asio::ip::tcp::acceptor acceptor_;
    std::string log_prefix_;
    boost::function<void(std::string&)> received_message_callback_;
    boost::condition_variable connection_cond_;
    boost::mutex connection_mutex_;
    bool client_connected_;
    boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
};

} // namespace my_project

#endif // #ifndef TCP_SERVER_

//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project {
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(
    std::string log_prefix, boost::asio::any_io_executor executor,
    boost::function<void(std::string&)> received_message_callback)
    : executor_(make_strand(executor)),
      socket_(executor_),
      log_prefix_(log_prefix),
      received_message_callback_(received_message_callback)
{ }

boost::asio::ip::tcp::socket& TcpConnectionHandler::socket() { return socket_; }

void TcpConnectionHandler::start() {
    async_read_until(socket_, message_, "\r\n",
         boost::bind(&TcpConnectionHandler::handle_read,
                     shared_from_this(),
                     boost::asio::placeholders::error,
                     boost::asio::placeholders::bytes_transferred));
}

void TcpConnectionHandler::write(const std::string& message) {
    post(executor_,
         boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
}

void TcpConnectionHandler::writeImpl(const std::string& message) {
    outbox_.push_back(message);
    if (outbox_.size() > 1) {
        // outstanding async_write
        return;
    }

    write();
}

void TcpConnectionHandler::write() {
    const std::string& message = outbox_[0];
    boost::asio::async_write(
        socket_, boost::asio::buffer(message.c_str(), message.size()),
        boost::asio::bind_executor(
            executor_,
            boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
}

void TcpConnectionHandler::handle_read(const boost::system::error_code& error,
                                       size_t /*bytes_transferred*/) {

    std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
    // Check for client disconnection
    if ((boost::asio::error::eof == error) ||
        (boost::asio::error::connection_reset == error)) {
        // LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
        return;
    }

    // Convert stream to string
    std::istream stream(&message_);
    std::istreambuf_iterator<char> eos;
    std::string message_str(std::istreambuf_iterator<char>(stream), eos);

    // LOG(DEBUG) << log_prefix_ << " communication object received message: "
    // << getPrintableMessage(message_str);

    std::istringstream iss(message_str);

    std::string msg;
    std::getline(iss, msg, '\r'); // Consumes from the streambuf.

    // Discard the rest of the message from buffer
    message_.consume(message_.size());

    if (!error) {
        received_message_callback_(msg);
        start();
    } else {
        // TODO: Handler here the error
    }
}

void TcpConnectionHandler::handle_write(const boost::system::error_code& error,
                                        size_t /*bytes_transferred*/) {
    outbox_.pop_front();

    if (error) {
        std::cerr << "could not write: "
                  << boost::system::system_error(error).what() << std::endl;
        return;
    }

    if (!outbox_.empty()) {
        // more messages to send
        write();
    }
}

// TcpServer

TcpServer::TcpServer(
    std::string log_prefix, unsigned int port,
    boost::function<void(std::string&)> received_message_callback)
        : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(
                                     boost::asio::ip::tcp::v4(), port)),
          client_connected_(false) {

    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;

    start_accept();

    // Run io_service in secondary thread
    io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}

TcpServer::~TcpServer() {
    if (io_thread_.joinable()) {
        //io_service_.stop();
        //io_thread_.interrupt();
        io_thread_.join();
    }
}

void TcpServer::start() {
    // Wait until client is connected to our TCP server. (condition variable)
    boost::unique_lock<boost::mutex> lock(connection_mutex_);

    while (!client_connected_) {
        // LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish
        // connection...";

        connection_cond_.wait(lock);
    }

    // LOG(INFO) << log_prefix_ << " client successfully connected.";
}

void TcpServer::write(std::string content) { connection_->write(content); }

void TcpServer::start_accept() {
    // Create a new connection handler
    connection_.reset(new TcpConnectionHandler(
        log_prefix_, acceptor_.get_executor(), received_message_callback_));

    // Asynchronous accept operation and wait for a new connection.
    acceptor_.async_accept(connection_->socket(),
                           boost::bind(&TcpServer::handle_accept, this,
                                       connection_,
                                       boost::asio::placeholders::error));

    // LOG(DEBUG) << log_prefix_ << " communication object started asynchronous
    // TCP/IP connection acceptance.";
}

void TcpServer::handle_accept(
    boost::shared_ptr<TcpConnectionHandler> connection,
    const boost::system::error_code& error) {
    std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
    if (!error) {
        // LOG(INFO) << log_prefix_ << " client connected!";
        connection->start();
        boost::mutex::scoped_lock lock(connection_mutex_);
        client_connected_ = true;
        connection_cond_.notify_one();
        // LOG(INFO) << log_prefix_ << " client connection accepted";
    }

    start_accept();
}
} // namespace my_project

int main() {
    my_project::TcpServer s("demo", 6868, [](std::string& s) {
        std::cout << "Received msg: " << std::quoted(s) << "\n";
    });
}

И как клиент, т.е.

cat test.cpp | netcat -Cw 0 localhost 6868

Печатает всю партию, как

Received msg: "//#define BOOST_BIND_GLOBAL_PLACEHOLDERS"
Received msg: "#include <boost/thread.hpp>"
Received msg: "std::string& message);"
Received msg: "td::string> outbox_;"
Received msg: ":shared_ptr<TcpConnectionHandler> connection_;"
Received msg: "#include \"utils.h\""
Received msg: "ConnectionHandler::start() {"
Received msg: "::writeImpl(const std::string& message) {"
Received msg: "s(),"
Received msg: "       // LOG(ERROR) << log_prefix_ << \" TCP/IP client disconnected!\";"
Received msg: " // Consumes from the streambuf."
Received msg: "e: \""
Received msg: "nt_connected_(false) {"
Received msg: "d to our TCP server. (condition variable)"
Received msg: "ection handler"
Received msg: "nication object started asynchronous"
Received msg: "ond_.notify_one();"

Как видите, вам нужно исправить это, нужно принять во внимание bytes_received, но пока я оставлю это вам.

Пост скриптум

Ой. Я заметил еще одну вещь. Если вы намерены принять только одно соединение, не сбрасывайте connection_, потому что это означает, что write действует на неподключенном экземпляре. Возможно, вы хотели вести список подключенных клиентов?

Вот простая переделка, чтобы заменить одного члена connection_ на list<weak_ptr<TcpConnectionHandler> > connections_;. В него даже встроена базовая сборка мусора.

Сообщение транслируется всем подключенным клиентам.

Прямой эфир на Колиру

//#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace my_project {
    namespace ph = boost::asio::placeholders;
    using boost::system::error_code;
    using boost::asio::ip::tcp;
    using Executor = boost::asio::executor;
    //using Executor = boost::asio::any_io_executor; // boost 1.74.0
    using Callback = boost::function<void(std::string const&)>;

    class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler> {
      public:
        TcpConnectionHandler(Executor executor, Callback callback);

        tcp::socket& socket() { return socket_; }

        void start();
        void write(std::string const& message);

        ~TcpConnectionHandler() { std::cerr << __FUNCTION__ << "\n"; }
      private:
        void writeImpl(const std::string& message);
        void write_loop();
        void handle_read(error_code error, size_t bytes_transferred);
        void handle_write(error_code error, size_t bytes_transferred);

        Executor executor_;
        tcp::socket socket_;
        boost::asio::streambuf message_;
        Callback received_message_callback_;

        std::list<std::string> outbox_;
    };

    using ConnectionPtr = boost::shared_ptr<TcpConnectionHandler>;
    using ConnectionHandle = boost::weak_ptr<TcpConnectionHandler>;

    class TcpServer {
      public:
        TcpServer(unsigned short port, Callback callback);
        ~TcpServer();

        void write(std::string const& content);

      private:
        void start_accept();

        void handle_accept(ConnectionPtr connection,
                           error_code error);

        boost::asio::io_service io_service_;
        std::list<ConnectionHandle> connections_;
        tcp::acceptor acceptor_;
        Callback received_message_callback_;
        boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
    };

} // namespace my_project

#endif // #ifndef TCP_SERVER_

//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project {
    // TcpConnectionHandler
    TcpConnectionHandler::TcpConnectionHandler(Executor executor, Callback callback)
        : executor_(make_strand(executor)),
          socket_(executor_),
          received_message_callback_(callback)
    { }

    void TcpConnectionHandler::start() {
        async_read_until(socket_, message_, "\r\n",
             boost::bind(&TcpConnectionHandler::handle_read,
                         shared_from_this(), ph::error, ph::bytes_transferred));
    }

    void TcpConnectionHandler::write(const std::string& message) {
        //std::cerr << __FUNCTION__ << ": " << message.length() << "\n";
        post(executor_, boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
    }

    void TcpConnectionHandler::writeImpl(const std::string& message) {
        outbox_.push_back(message);
        if (outbox_.size() == 1)
            write_loop();
    }

    void TcpConnectionHandler::write_loop() {
        boost::asio::async_write(
            socket_, boost::asio::buffer(outbox_.front()),
            boost::asio::bind_executor(
                executor_,
                boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                            ph::error, ph::bytes_transferred)));
    }

    void TcpConnectionHandler::handle_read(error_code error, size_t bytes_transferred) {
        auto f = buffers_begin(message_.data()), l = f + bytes_transferred;
        message_.consume(bytes_transferred);

        std::istringstream iss(std::string(f, l));
        for (std::string msg; std::getline(iss, msg, '\n');) {
            if (msg.back() == '\r') {
                msg.pop_back();
            }
            received_message_callback_(msg);
        }; // Consumes from the streambuf.


        if (!error) {
            start();
        } else {
            std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        }
    }

    void TcpConnectionHandler::handle_write(error_code error, size_t /*bytes_transferred*/) {
        outbox_.pop_front();

        if (error) {
            std::cerr << "could not write: " << error.message() << std::endl;
        } else if (!outbox_.empty()) {
            // more messages to send
            write_loop();
        }
    }

    // TcpServer
    TcpServer::TcpServer(unsigned short port, Callback callback)
       : acceptor_(io_service_, { {}, port }),
         received_message_callback_(callback)
    {
        start_accept();
        io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
    }

    TcpServer::~TcpServer() {
        if (io_thread_.joinable()) {
            //io_service_.stop();
            //io_thread_.interrupt();
            io_thread_.join();
        }
    }

    void TcpServer::write(std::string const& content) {
        for (auto& handle : connections_)
            if (ConnectionPtr con = handle.lock())
                con->write(content);
    }

    void TcpServer::start_accept() {
        // optionally garbage-collect connection handles
        connections_.remove_if (std::mem_fn(&ConnectionHandle::expired));

        // Create a new connection handler
        auto connection_ = boost::make_shared<TcpConnectionHandler>(
            acceptor_.get_executor(), received_message_callback_);

        // Asynchronous accept operation and wait for a new connection.
        acceptor_.async_accept(connection_->socket(),
            boost::bind(&TcpServer::handle_accept, this, connection_, ph::error));
    }

    void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, error_code error) {
        //std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        if (!error) {
            connections_.push_back(connection);
            connection->start();
        }

        start_accept();
    }
} // namespace my_project

int main() {
    my_project::TcpServer server(6868, [&server](std::string const& msg) {
        std::cout << "Broadcasting msg: " << std::quoted(msg) << "\n";
        server.write(msg + "\r\n");
    });
}

С симулированными клиентами:

(for a in 1 2 3; do echo -e "Foo $a\nBar $a\nQux $a" | nc -Cw1 localhost 6868 | while read line; do echo "Job $a: $line"; done& done;)

Печатает на сервере:

Broadcasting msg: "Foo 1"
Broadcasting msg: "Foo 2"
Broadcasting msg: "Bar 1"
Broadcasting msg: "Bar 2"
Broadcasting msg: "Foo 3"
Broadcasting msg: "Qux 1"
Broadcasting msg: "Qux 2"
Broadcasting msg: "Bar 3"
Broadcasting msg: "Qux 3"
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler

И клиенты печатают (в зависимости от таймингов):

Job 1: Foo 1
Job 2: Bar 1
Job 1: Bar 1
Job 2: Foo 2
Job 1: Foo 2
Job 3: Foo 3
Job 2: Qux 1
Job 1: Qux 1
Job 3: Bar 2
Job 2: Foo 3
Job 1: Foo 3
Job 3: Bar 3
Job 1: Bar 2
Job 2: Bar 2
Job 3: Qux 2
Job 1: Bar 3
Job 2: Bar 3
Job 3: Qux 3
Job 1: Qux 2
Job 2: Qux 2
Job 1: Qux 3
Job 2: Qux 3

Я заметил еще одну пожизненную проблему, когда вы перезаписываете connection_, так что write не может работать правильно. Это также приводит к ошибкам неверных файловых дескрипторов.

sehe 14.12.2020 21:07

Добавлен встречный пример, который позволяет несколько подключений к живой демонстрации: coliru.stacked-crooked.com/a/dafa59afbac5cfa4

sehe 14.12.2020 21:08

Большое спасибо за ваш ответ! Я не мог проверить это до сегодняшнего дня, но он решил все мои проблемы с кодом, и даже те, о которых я не знал!

bsantek 18.12.2020 13:24

Другие вопросы по теме