Я использую gRPC
async client
аналогично примеру .
В этом примере (опубликованном в gRPC
официальном github
) клиент выделяет память для отправляемого сообщения, используя адрес как tag
для completion queue
, и когда на сообщение отвечает поток слушателя, память (известная по tag
-адресу) бесплатно.
Я боюсь ситуации, когда сервер не отвечает на сообщение, а память никогда не освобождается.
gRPC
от этой ситуации?Функция отправки асинхронного клиента
void SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
Функция получения асинхронного клиента для потока
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// Once we're complete, deallocate the call object.
delete call;
}
}
Основной
int main(int argc, char** argv) {
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
Защищает ли меня gRPC от этой ситуации?
Как бы. gRPC гарантирует, что все операции в очереди рано или поздно окажутся в соответствующей очереди завершения. Итак, ваш код в порядке, если:
Другими словами: это нормально, но хрупко.
Если вы хотите быть по-настоящему крепким, вам нужно std::shared_ptr<>
. Однако они могут неожиданно повлиять на многопоточную производительность. Так что, стоит оно того или нет, зависит от того, где находится ваше приложение в спектре производительности и надежности.
Такой рефакторинг будет выглядеть так:
AsyncClientCall
от std::enable_shared_from_this
call
на std::make_shared<AsyncClientCall>()
while (cq_.Next(&got_tag, &ok)) {
auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();
И избавиться от delete
, разумеется.
Вы также можете получить приличную полумеру с помощью unique_ptr<>
:
auto call = std::make_unique<AsyncClientCall>();
...
call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());
и
std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};
Это защищает от рефакторинга и исключений, сохраняя при этом все остальное. Однако это можно использовать только для унарных rpcs, которые производят одно событие завершения. Потоковые rpcs или rpcs, которые обмениваются метаданными, потребуют совершенно другой обработки.
@ lior.i Если сервер не отвечает, RPC рано или поздно выйдет из строя из-за тайм-аута, закрытия сокета или отмены каким-либо образом. В этот момент тег будет помещен в очередь со статусом «не в порядке».
"Можем ли мы очистить очередь от старых сообщений в этой ситуации?" Вы всегда можете отменить вызов. Но, как правило, добавления крайнего срока достаточно.
что вы имеете в виду под отменой вызова?
Кроме того, просто чтобы убедиться, что вы не запутались. Единственным общедоступным понятием «очереди» в grpc являются очереди завершения, которые не содержат всех запущенных rpcs, а только те, которые завершили запрошенные пользователем операции.
Отлично, спасибо @Frank, не могли бы вы добавить эту информацию к ответу?
Для варианта A аргументы функции Finish() должны включать get() на интеллектуальный указатель: call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
Спасибо за ответ, что происходит в ситуации, когда сервер не отвечает на некоторые сообщения? будут ли сообщения просто оставаться и ждать в очереди? может очередь переполниться? можем ли мы очистить очередь от старых сообщений в этой ситуации?