Если я отправляю сообщения локально из той же системы, то сервер Boost получает сообщения правильно.
Когда клиент является удаленным приложением в другой системе и отправляет сообщения через TCP\IP, некоторые сообщения прерываются случайным образом (ввод строки).
Например, если клиент отправил «ЭТО СООБЩЕНИЕ», сервер прочитает его как
"ЭТО Я
ССАГЭ"
Это класс сервера.
#pragma once
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <iostream>
#include <global.h>
#include <memory>
#include <fstream>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
auto self(shared_from_this());
// dispatch not strictly necessary for single-threaded contexts
dispatch(
socket_.get_executor(),
[this, self]
{
do_read();
});
}
private:
void handleCommand()
{
enqueueAnswer();
}
void enqueueAnswer()
{
if (stdqueAnswers.size() == 1)
{
do_write();
}
}
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
if (length > 0) {
// In case the message has a leading 1 than we have to send a answer back to the client.
if (data_[0] == '1') {
std::string stdstrCmd(data_);
stdstrCmd.erase(0, 2);
wavefrontAccess->ReceiveCommandExternalGet(stdstrCmd);
handleCommand();
}
else
{
std::string strData(data_, length);
if (!strData.empty() || strData.find_first_not_of(' ') != std::string::npos)
{
// There's a non-space.
commandsQueue.push(strData); // this is std Queue
}
}
}
do_read();
}
});
}
void do_write()
{
if (stdqueAnswers.empty())
return;
auto self(shared_from_this());
async_write(
socket_,
boost::asio::buffer(stdqueAnswers.front()),
[this, self](boost::system::error_code ec, size_t)
{
if (!ec)
{
stdqueAnswers.pop();
do_write();
}
});
}
tcp::socket socket_;
enum { max_length = 12000 };
char data_[max_length];
};
class server
{
public:
server(boost::asio::io_context& io_context, std::uint16_t port)
: acceptor_{ io_context, tcp::endpoint(tcp::v4(), port) }
{
acceptor_.listen();
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
make_strand(acceptor_.get_executor()),
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::make_shared<session>(std::move(socket))->start();
do_accept();
}
});
}
tcp::acceptor acceptor_;
};
Даже локально сообщения могут разбиваться на более мелкие TCP-пакеты как для клиента, так и для сервера.
Протокол «кадрирования» должен иметь возможность однозначно кодировать и декодировать последовательность сообщений переменного размера для любой последовательности разделенных частей (например, abc
, ab c
, a bc
, a b c
).
Ваш протокол может выполнять «кадрирование» сам по себе или работать поверх другого протокола «кадрирования».
Заголовок фиксированной длины (20 байт) содержит размеры полей переменного размера, включая содержимое сообщения.
TCP читает 20 байт.
TCP анализирует размер сообщения.
TCP читает это количество байтов.
(повторить)
Заголовок не имеет фиксированной длины.
Однако его структура однозначна.
Это текст, который заканчивается на \r\n\r\n
.
Этот текст не должен содержать \r\n\r\n
, но может представлять его с экранированием .
Где-то в этом тексте есть размер сообщения.
HTTP читает текст (символ за символом), пока не увидит \r\n\r\n
.
HTTP анализирует размер сообщения.
HTTP читает это количество байтов.
(повторить)
Если текст вашего сообщения не имеет ограничений (например, может включать терминатор/экранирующую строку), то вам необходимо писать размер сообщения перед каждым сообщением.
В противном случае, если ваши сообщения имеют структуру, вы можете написать «терминатор» (например, конец строки или ноль-терминатор) после окончания каждого сообщения и читать сообщение до «терминатора».
Как объясняет Руслан, вы должны использовать не read_some
, а операцию более высокого уровня, которая считывает полное «сообщение», как определено проводным протоколом уровня вашего приложения.
У вас явно уже есть какой-то протокол (начальные байты), и мы не можем догадаться, что может быть остальным. Для простоты предположим, что полное сообщение заканчивается символом \n
. Вот мой упрощенный вариант:
async_read_until(
socket_, boost::asio::dynamic_buffer(data_, max_length), "\n",
[this, self](boost::system::error_code ec, size_t length) {
std::cerr << "async_read_until() " << ec.message() << std::endl;
if (!ec) {
std::string msg = data_.substr(0, length /* - 1*/);
data_.erase(0, length);
if (!msg.empty() && msg.front() == '1') {
// we have to send a answer back to the client
wavefrontAccess->ReceiveCommandExternalGet(msg.substr(2));
handleCommand();
} else {
if (msg.find_first_not_of(' ') != std::string::npos) {
// There's a non-space
commandsQueue.push(msg);
}
}
do_read();
}
});
Я упростил, сделав ваш буфер std::string
напрямую. Это немедленно «решает» сложность, заключающуюся в том, что read_until
, очевидно, может читать более одного сообщения.
Раскомментируйте
/* -1 */
, чтобы исключить символ\n
из сообщения.
С макетом некоторых недостающих битов:
#include <boost/asio.hpp>
#include <deque>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
auto self(shared_from_this());
std::cerr << "start() " << socket_.remote_endpoint() << std::endl;
dispatch(socket_.get_executor(), [this, self] { do_read(); });
}
private:
void handleCommand() { enqueueAnswer(); }
void enqueueAnswer() {
if (stdqueAnswers.size() == 1) {
do_write();
}
}
void do_read() {
auto self(shared_from_this());
async_read_until(
socket_, boost::asio::dynamic_buffer(data_, max_length), "\n",
[this, self](boost::system::error_code ec, size_t length) {
std::cerr << "async_read_until() " << ec.message() << std::endl;
if (!ec) {
std::string msg = data_.substr(0, length /* - 1*/);
data_.erase(0, length);
if (!msg.empty() && msg.front() == '1') {
// we have to send a answer back to the client
wavefrontAccess->ReceiveCommandExternalGet(msg.substr(2));
handleCommand();
} else {
if (msg.find_first_not_of(' ') != std::string::npos) {
// There's a non-space
commandsQueue.push(msg);
}
}
do_read();
}
});
}
void do_write() {
if (stdqueAnswers.empty())
return;
auto self(shared_from_this());
async_write( //
socket_, boost::asio::buffer(stdqueAnswers.front()),
[this, self](boost::system::error_code ec, size_t) {
std::cerr << "async_write() " << ec.message() << std::endl;
if (!ec) {
stdqueAnswers.pop_back();
do_write();
}
});
}
enum { max_length = 12000 };
tcp::socket socket_;
std::string data_;
std::deque<std::string> stdqueAnswers;
std::queue<std::string> commandsQueue;
struct WavefrontAccess {
session* _sess;
void ReceiveCommandExternalGet(std::string cmd) {
_sess->stdqueAnswers.push_back("reply for '" + std::move(cmd) + "'");
}
};
std::unique_ptr<WavefrontAccess> wavefrontAccess =
std::make_unique<WavefrontAccess>(WavefrontAccess{this});
};
class server {
public:
server(boost::asio::io_context& io_context, uint16_t port)
: acceptor_{io_context, {{}, port}} {
acceptor_.listen();
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
make_strand(acceptor_.get_executor()),
[this](boost::system::error_code ec, tcp::socket socket) {
std::cerr << "accept: " << ec.message() << " "
<< (ec ? tcp::endpoint{} : socket.remote_endpoint())
<< std::endl;
if (!ec) {
std::make_shared<session>(std::move(socket))->start();
do_accept();
}
});
}
tcp::acceptor acceptor_;
};
int main() {
boost::asio::io_context ioc(1);
server s(ioc, 7878);
ioc.run();
}