Утечка памяти в gRPC async_client

Я использую gRPCasync client аналогично примеру .

В этом примере (опубликованном в gRPC официальном github) клиент выделяет память для отправляемого сообщения, используя адрес как tag для completion queue, и когда на сообщение отвечает поток слушателя, память (известная по tag-адресу) бесплатно.

Я боюсь ситуации, когда сервер не отвечает на сообщение, а память никогда не освобождается.

  • Защитит ли меня gRPC от этой ситуации?
  • Должен ли я реализовать это по-другому? (используя интеллектуальные указатели/сохранить указатели в структуре данных/и т.д...)

Функция отправки асинхронного клиента

void SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Call object to store rpc data
    AsyncClientCall* call = new AsyncClientCall;

    // Because we are using the asynchronous API, we need to hold on to
    // the "call" instance in order to get updates on the ongoing RPC.
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // StartCall initiates the RPC call
    call->response_reader->StartCall();

    call->response_reader->Finish(&call->reply, &call->status, (void*)call);

}

Функция получения асинхронного клиента для потока

void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // Block until the next result is available in the completion queue "cq".
    while (cq_.Next(&got_tag, &ok)) {
        // The tag in this example is the memory location of the call object
        AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

        // Verify that the request was completed successfully. Note that "ok"
        // corresponds solely to the request for updates introduced by Finish().
        GPR_ASSERT(ok);

        if (call->status.ok())
            std::cout << "Greeter received: " << call->reply.message() << std::endl;
        else
            std::cout << "RPC failed" << std::endl;

        // Once we're complete, deallocate the call object.
        delete call;
    }
}

Основной

int main(int argc, char** argv) {


    GreeterClient greeter(grpc::CreateChannel(
            "localhost:50051", grpc::InsecureChannelCredentials()));

    // Spawn reader thread that loops indefinitely
    std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

    for (int i = 0; i < 100; i++) {
        std::string user("world " + std::to_string(i));
        greeter.SayHello(user);  // The actual RPC call!
    }

    std::cout << "Press control-c to quit" << std::endl << std::endl;
    thread_.join();  //blocks forever

    return 0;
}
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
2 211
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Защищает ли меня gRPC от этой ситуации?

Как бы. gRPC гарантирует, что все операции в очереди рано или поздно окажутся в соответствующей очереди завершения. Итак, ваш код в порядке, если:

  • Никакое исключение не выбрасывается в неудачное время.
  • Вы не вносите изменения в код, который создает путь кода, который не включает постановку операции в очередь или удаление вызова.

Другими словами: это нормально, но хрупко.

Вариант А:

Если вы хотите быть по-настоящему крепким, вам нужно std::shared_ptr<>. Однако они могут неожиданно повлиять на многопоточную производительность. Так что, стоит оно того или нет, зависит от того, где находится ваше приложение в спектре производительности и надежности.

Такой рефакторинг будет выглядеть так:

  1. Наследовать AsyncClientCall от std::enable_shared_from_this
  2. Измените конструкцию call на std::make_shared<AsyncClientCall>()
  3. В обработчике очереди завершения увеличьте счетчик ссылок:
while (cq_.Next(&got_tag, &ok)) {
    auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();

И избавиться от delete, разумеется.

Вариант Б:

Вы также можете получить приличную полумеру с помощью unique_ptr<>:

    auto call = std::make_unique<AsyncClientCall>();
    ...
    call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());

и

    std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};

Это защищает от рефакторинга и исключений, сохраняя при этом все остальное. Однако это можно использовать только для унарных rpcs, которые производят одно событие завершения. Потоковые rpcs или rpcs, которые обмениваются метаданными, потребуют совершенно другой обработки.

Спасибо за ответ, что происходит в ситуации, когда сервер не отвечает на некоторые сообщения? будут ли сообщения просто оставаться и ждать в очереди? может очередь переполниться? можем ли мы очистить очередь от старых сообщений в этой ситуации?

lior.i 19.07.2021 17:20

@ lior.i Если сервер не отвечает, RPC рано или поздно выйдет из строя из-за тайм-аута, закрытия сокета или отмены каким-либо образом. В этот момент тег будет помещен в очередь со статусом «не в порядке».

user4442671 19.07.2021 17:22

"Можем ли мы очистить очередь от старых сообщений в этой ситуации?" Вы всегда можете отменить вызов. Но, как правило, добавления крайнего срока достаточно.

user4442671 19.07.2021 17:25

что вы имеете в виду под отменой вызова?

lior.i 19.07.2021 17:26
TryCancel() , но, как я уже сказал, set_deadline() обычно предпочтительнее, потому что это «выстрелил-забыл».
user4442671 19.07.2021 17:28

Кроме того, просто чтобы убедиться, что вы не запутались. Единственным общедоступным понятием «очереди» в grpc являются очереди завершения, которые не содержат всех запущенных rpcs, а только те, которые завершили запрошенные пользователем операции.

user4442671 19.07.2021 17:34

Отлично, спасибо @Frank, не могли бы вы добавить эту информацию к ответу?

lior.i 20.07.2021 08:27

Для варианта A аргументы функции Finish() должны включать get() на интеллектуальный указатель: call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());

raschild 24.08.2022 16:26

Другие вопросы по теме