«Процесс завершен с кодом завершения -1073740791 (0xC0000409)» при использовании вектора будущего

Фон

На первом курсе я использовал API binance-websocket и std::future. Я пишу программу для проверки разницы во времени между локальным хостом и сервером Binance и получения чистой задержки.

Я помещаю свой код в свой репозиторий на GitHub ws_binance_time_diff_delay

У меня есть 2 проблемы.

Проблема 1.

Он не может работать полностью.

Это показывает:

{"id":1,"status":200,"result":{"serverTime":1718697917878},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718697921760; 1718697921956; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;

И тогда он не отвечает и не прекращается.

Проблема 2.

В файле ws_diff_delay.cpp, в строках 84 и 85. Если я использую первый, он показывает:

{"id":1,"status":200,"result":{"serverTime":1718698030116},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718698034095; 1718698034251; 1718698030291;
1718698034095; 1718698034314; 1718698030342;
1718698034095; 1718698034314; 1718698030342;

Process finished with exit code -1073740791 (0xC0000409)

Он завершается сам по себе и показывает сообщение stackoverflow «Процесс завершен с кодом завершения -1073740791 (0xC0000409)».

Я отладил и проследил его до строки 97.

Он останавливается на файле сборки debug_assamble в строке 24.

subl   $0xd8, %esp
movq   0x2e3942(%rip), %rax
xorq   %rsp, %rax
movq   %rax, 0xc0(%rsp)
andq   $0x0, 0x28(%rsp)
leaq   -0x26(%rip), %rax          ; RaiseException
andl   $0x1, %edx
movl   %ecx, 0x20(%rsp)
movl   %edx, 0x24(%rsp)
movq   %rax, 0x30(%rsp)
testq  %r9, %r9
je     0x18004468a
movl   $0xf, %eax
leaq   0x40(%rsp), %rcx
cmpl   %eax, %r8d
movq   %r9, %rdx
cmovbel %r8d, %eax
movl   %eax, %r8d
movl   %r8d, 0x38(%rsp)
shlq   $0x3, %r8
callq  0x1800b7c77
leaq   0x20(%rsp), %rcx
callq  *0x1f864c(%rip)
nopl   (%rax,%rax)
movq   0xc0(%rsp), %rcx
xorq   %rsp, %rcx
callq  0x1800af760
addq   $0xd8, %rsp
retq
int3
andl   $0x0, 0x38(%rsp)
jmp    0x180044660
int3
int3
int3
int3
int3
int3
int3
jno    0x18004465c
popq   %rbx

Сцена перед исключением Сцена перед исключением

В чем разница между строкой 84 и строкой 85.

Я печатаю шрифт в строке 72, чтобы убедиться, что шрифт правильный.


  • Файл ./CMakeLists.txt
     cmake_minimum_required(VERSION 3.24)
     project(ws_binance_time_diff_delay)
     set(CMAKE_CXX_STANDARD 17)
     add_definitions(
             -DBOOST_THREAD_PROVIDES_FUTURE
             -DBOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
             -DBOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
     )
    
     add_executable(ws_binance_time_diff_delay
             ws_diff_delay.cpp
     )
    
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${CMAKE_SOURCE_DIR})
     target_include_directories(ws_binance_time_diff_delay PRIVATE "C:/Users/Mike_Wei/CLionProjects/commonUtils")
    
     find_package(RapidJSON CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE rapidjson)
    
     find_package(OpenSSL REQUIRED)
     find_package(ZLIB REQUIRED)
     find_package(Boost REQUIRED COMPONENTS system thread date_time log log_setup program_options)
    
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${Boost_INCLUDE_DIRS})
     target_link_libraries(ws_binance_time_diff_delay PRIVATE ${Boost_LIBRARIES})
    
     find_package(Threads REQUIRED)
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${OPENSSL_INCLUDE_DIR})
     target_link_libraries(ws_binance_time_diff_delay PRIVATE
             Boost::system
             Boost::log
             OpenSSL::SSL
             OpenSSL::Crypto
             Threads::Threads
     )
    
     find_package(jsoncpp CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE JsonCpp::JsonCpp)
    
     find_package(fmt CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE fmt::fmt)    ```
    
    
  • Файл ws_diff_delay.cpp
     #include <cstdlib>
     #include <stdint.h>
    
     #include <iostream>
     #include <string>
     #include <optional>
     #include <future>
     #include <chrono>
     #include <functional>
    
     #include <boost/beast/core.hpp>
     #include <boost/beast/websocket.hpp>
     #include <boost/beast/websocket/ssl.hpp>
     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/log/trivial.hpp>
    
     #include <rapidjson/document.h>
    
     #include <commonUtils.h>
    
     class mini_ws_client {
         using tcp = boost::asio::ip::tcp;
    
         const std::string host = "testnet.binance.vision";
         const std::string port = "443";
         const std::string target = "/ws-api/v3";
    
         boost::asio::ssl::context ctx{boost::asio::ssl::context::sslv23_client};
         boost::asio::io_context ioc;
         std::shared_ptr<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > > ws_sp;
    
     public:
         explicit mini_ws_client(
             const std::string host = "testnet.binance.vision",
             const std::string port = "443",
             const std::string target = "/ws-api/v3"
         ): host(host), port(port), target(target) {
             tcp::resolver resolver(ioc);
             ws_sp = std::make_shared<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > >(ioc, ctx);
             if (!SSL_set_tlsext_host_name(ws_sp->next_layer().native_handle(), host.c_str())) {
                 boost::system::error_code ec{static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category()};
                 throw boost::system::system_error{ec};
             }
             auto const results = resolver.resolve(host, port);
             auto ep = boost::asio::connect(ws_sp->next_layer().next_layer(), results);
             ws_sp->next_layer().handshake(boost::asio::ssl::stream_base::client);
             ws_sp->set_option(boost::beast::websocket::stream_base::decorator(
                 [](boost::beast::websocket::request_type &req) {
                     req.set(boost::beast::http::field::user_agent,
                             std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
                 }));
             ws_sp->handshake(host, target);
         }
    
         ~mini_ws_client() {
             ws_sp->close(boost::beast::websocket::close_code::normal);
             ws_sp.reset();
         }
    
         std::pair<double, double> get_diff_delay(
             int test_times = 100,
             int one_group_num = 10,
             const std::string msg = "{\"id\":1,\"method\":\"time\"}"
         ) const {
             auto future_example = std::async(
                 std::launch::async,
                 &mini_ws_client::get_one_timestamps,
                 this,
                 msg
             );
             std::cout << "type of future_example is " << typeid(future_example).name() << std::endl;
             std::vector<int64_t> timeStamps{};
             int cnt{test_times};
             while (cnt) {
                 int cur_group_num{one_group_num};
                 if (cnt > one_group_num) {
                     cnt -= one_group_num;
                 } else {
                     cur_group_num = test_times;
                     cnt = 0;
                 }
                 std::cout << "test group num " << cur_group_num << "; " << "remain num " << cnt << std::endl;
                 std::vector<std::future<std::vector<int64_t> > > time_stamps_vec{};
                 // std::vector<decltype(future_example) > time_stamps_vec{};
                 for (int i{0}; i < cur_group_num; ++i) {
                     time_stamps_vec.emplace_back(
                         std::async(
                             std::launch::async,
                             &mini_ws_client::get_one_timestamps,
                             this,
                             msg
                         )
                     );
                 }
                 for (auto &timestamps_future: time_stamps_vec) {
                     auto one_timestamp = timestamps_future.get();
                     printVector(one_timestamp);
                     timeStamps.insert(timeStamps.end(), one_timestamp.begin(), one_timestamp.end());
                 }
             }
             auto diff_delay = calculateTimeDiffDelaySub(timeStamps);
             return diff_delay;
         }
    
         std::vector<int64_t> get_one_timestamps(const std::string &msg) const {
             std::vector<int64_t> timeStamps_one_case{};
             auto presend_time = getTimeStamp();
             auto response_str = request_to_response(msg);
             auto postsend_time = getTimeStamp();
             auto server_time = get_server_time(response_str);
             if (server_time) {
                 timeStamps_one_case.push_back(presend_time);
                 timeStamps_one_case.push_back(postsend_time);
                 timeStamps_one_case.push_back(server_time.value());
             }
             printVector(timeStamps_one_case);
    
             return timeStamps_one_case;
         }
    
         std::pair<double, double> calculateTimeDiffDelaySub(const std::vector<int64_t> &timeStamps) const {
             double diff{0.};
             double delay{0.};
             for (int i{0}; i < timeStamps.size(); i += 3) {
                 diff += (
                     timeStamps[i]
                     + timeStamps[i + 1]
                     - (timeStamps[i + 2] << 1)
                 ) * 0.5;
                 delay += (timeStamps[i + 1] - timeStamps[i]) >> 1;
             }
             diff /= timeStamps.size() / 3;
             delay /= timeStamps.size() / 3;
             auto ret = std::make_pair(diff, delay);
             return ret;
         }
    
    
         int64_t getTimeStamp(int64_t diff = 0LL) const {
             // 获取当前时间点
             auto now = std::chrono::system_clock::now();
    
             // 将时间点转换为毫秒
             auto duration = now.time_since_epoch();
             auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    
             // 输出 13 位时间戳
             return millis - diff;
         }
    
    
         std::string request_to_response(const std::string &msg) const {
             ws_sp->write(boost::asio::buffer(msg));
             boost::beast::flat_buffer buffer;
             ws_sp->read(buffer);
             const char *bufferData = reinterpret_cast<const char *>(buffer.data().data());
             std::size_t bufferSize = buffer.data().size();
             std::string response(bufferData, bufferSize);
             return response;
         }
    
         std::optional<int64_t> get_server_time(const std::string &msg) const {
             rapidjson::Document msg_doc;
             msg_doc.Parse(msg.c_str());
             if (msg_doc.HasParseError() || !msg_doc.IsObject()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " << "JSON parse error or not an object" <<
          std::endl;
                 return std::nullopt;
             }
             if (!msg_doc.HasMember("result") || !msg_doc["result"].IsObject()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
          "No 'result' field or 'result' is not an object" << std::endl;
                 return std::nullopt;
             }
             const rapidjson::Value &result = msg_doc["result"];
             if (!result.HasMember("serverTime") || !result["serverTime"].IsInt64()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
      "'serverTime' field missing or not an int64"
          << std::endl;
                 return std::nullopt;
             }
             int64_t serverTime = result["serverTime"].GetInt64();
             return serverTime;
         }
     };
    
    
     int main() {
         auto ws_client = std::make_shared<mini_ws_client>("ws-api.binance.com", "443", "/ws-api/v3");
         std::cout << ws_client->request_to_response("{\"id\":1,\"method\":\"time\"}") << std::endl;;
         auto &[diff,delay] = ws_client->get_diff_delay();
         std::cout << "diff = " << diff << std::endl;
         std::cout << "delay = " << delay << std::endl;
     }
    
Альтернативные WebSockets для netty/java: удвоение пропускной способности небольших сообщений
Альтернативные WebSockets для netty/java: удвоение пропускной способности небольших сообщений
Этот пост - краткая презентация netty-websocket-http1 - альтернативной netty/java реализации RFC6455 - протокола WebSocket.
Очень простая установка Websocket с помощью Deno без каких-либо пакетов
Очень простая установка Websocket с помощью Deno без каких-либо пакетов
Здесь мы рассмотрим, как можно использовать Websocket в Deno и развернуть его в Deno deploy. Мы будем слушать Websocket, а также отправлять сообщения.
1
0
64
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

0xc0000409 означает STATUS_STACK_BUFFER_OVERRUN

Запуск вашего кода с дезинфицирующими средствами:

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address,undefined ")

Быстро диагностирует некоторые проблемы:

Очевидно, что какой-то буфер недействителен на время операции записи.

Я сделал рефакторинг вашего кода, чтобы его можно было читать: https://coliru.stacked-crooked.com/a/079b4b92556cd365

  • используйте строго типизированные образцы вместо вложенных векторов:
    // using Sample  = std::array<int64_t, 3>;
    struct Sample { int64_t pre, post, server; };
    using Samples = std::vector<Sample>;
    
  • Теперь вы получаете гораздо более простую логику:
    std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) const {
        double diff{0.};
        double delay{0.};
        for (auto& [pre, post, server] : samples) {
            diff  += (pre + post - (server * 2)) * 0.5;
            delay += (post - pre) / 2.0;
        }
        diff  /= samples.size();
        delay /= samples.size();
        return std::pair(diff, delay);
    }
    

В любом случае вы создаете дикие потоки, которые используют один и тот же веб-сокет без какой-либо синхронизации. Это и плохо, и бесполезно.

  • это плохо, потому что вызывает неопределенное поведение из-за гонок данных
  • это бесполезно, потому что при одном соединении вы не можете отправить еще один запрос до завершения первого

Вы имели в виду повторный запрос в одном и том же потоке или имели в виду параллельное тестирование с несколькими клиентскими соединениями?

Рефакторинг для параллельных пакетов

Где каждый пакет необходим последовательный, потому что для запуска на одном клиенте:

Прямой эфир на Колиру

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iostream>
#include <rapidjson/document.h>

#include <fmt/chrono.h>
#include <fmt/ranges.h>
static inline void printVector(auto const& v) { fmt::print("printVector: {}\n", v); }

// using Sample  = std::array<int64_t, 3>;
struct Sample { int64_t pre, post, server; };
using Samples = std::vector<Sample>;

using Message = std::shared_ptr<std::string const>;
static Message default_message() {
    static auto instance = std::make_shared<std::string>(R"({"id":1,"method":"time"})");
    return instance;
}

namespace asio      = boost::asio;
namespace ssl       = asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp           = asio::ip::tcp;
using error_code    = boost::system::error_code;
using namespace std::chrono_literals;

struct Args {
    std::string host   = "testnet.binance.vision";
    std::string port   = "443";
    std::string target = "/ws-api/v3";
};

class Client {
    using tcp = asio::ip::tcp;

    Args const                                  args;
    ssl::context                                ctx{ssl::context::sslv23_client};
    asio::io_context                            ioc;
    websocket::stream<ssl::stream<tcp::socket>> ws{ioc, ctx};

  public:
    Client(Args args = {}) : args(std::move(args)) { connect(); }

    ~Client() {
        error_code ec;
        ws.close(websocket::close_code::normal, ec);
        if (ec.failed())
            std::cerr << "~Client: " << ec.message() << std::endl;
    }

    std::string request(Message msg) {
        ws.write(asio::buffer(*msg));
        std::string response;
        auto buf = asio::dynamic_buffer(response);
        ws.read(buf);
        return response;
    }

  private:
    void connect() {
        tcp::resolver resolver(ioc);
        if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), args.host.c_str()))
            throw boost::system::system_error{
                error_code{static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()}};

        auto results = resolver.resolve(args.host, args.port);
        asio::connect(ws.next_layer().next_layer(), results);

        ws.next_layer().handshake(ssl::stream_base::client);
        ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(boost::beast::http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
        }));
        ws.handshake(args.host, args.target);
    }
};

namespace Benchmark {
    using clock      = std::chrono::system_clock;
    using time_point = clock::time_point;
    int64_t getTimeStamp() { return clock::now().time_since_epoch() / 1ms; }

    std::optional<int64_t> get_server_time(std::string const& msg) {
        rapidjson::Document doc;
        doc.Parse(msg.c_str());

        if (!doc.HasParseError() && doc.IsObject() && doc.HasMember("result"))
            if (auto const& result = doc["result"];
                result.IsObject() && result.HasMember("serverTime") && result["serverTime"].IsInt64())
                return result["serverTime"].GetInt64();

        std::cerr << "Unexpected or invalid response" << std::endl;
        return {};
    }

    std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) {
        double diff{0.};
        double delay{0.};
        if (!samples.empty()) {
            for (auto& [pre, post, server] : samples) {
                diff  += (pre + post - (server * 2)) * 0.5;
                delay += (post - pre) / 2.0;
            }
            diff  /= samples.size();
            delay /= samples.size();
        }
        return std::pair(diff, delay);
    }

    Samples perform_batch(Args const& args, unsigned n, Message msg) {
        Samples samples;
        try {
            Client client(args);
            for (Client c(args); n--;) {
                int64_t     pre  = getTimeStamp();
                std::string res  = client.request(msg);
                int64_t     post = getTimeStamp();

                if (std::optional<int64_t> server_time = get_server_time(res))
                    samples.push_back({pre, post, server_time.value()});
                else
                    throw std::runtime_error("No server time in response");
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Error in perform_batch: " << se.code().message() << std::endl;
        }
        return samples;
    }

    std::pair<double, double> run(Args const& args, unsigned total = 100, unsigned pergroup = 10,
                                                    Message msg = default_message()) {
        std::vector<std::future<Samples>> futs{};
        for (unsigned remain{total}; unsigned batch = std::min(remain, pergroup); remain -= batch) {
            std::cout << "batch " << batch << "; remain " << remain << std::endl;

            for (unsigned i = 0; i < batch; ++i)
                futs.emplace_back(std::async(std::launch::async, perform_batch, args, batch, msg));
        }

        Samples merged;
        for (auto& fut : futs) {
            auto one = fut.get();
            merged.insert(merged.end(), one.begin(), one.end());
        }
        return calculateTimeDiffDelaySub(merged);
    }
}

int main() {
    Args args {"ws-api.binance.com", "443", "/ws-api/v3"};
    std::cout << Client(args).request(default_message()) << std::endl;

    auto const& [diff, delay] = Benchmark::run(args, 20, 4);
    std::cout << std::fixed;
    std::cout << "diff = " << diff << " delay = " << delay << std::endl;
}

Ложные ошибки во время деструктора клиента указывают на то, что мы можем столкнуться с ограничением скорости. Я оставлю это на ваше усмотрение для диагностики:

Исправлен выбор часов и, насколько вы знаете, тесты больше не ограничены по скорости.

sehe 19.06.2024 01:51

Спасибо, я понял.

Monhde Sau Hung 20.06.2024 06:27

Другие вопросы по теме