天天看点

c++客户端 grpc_gRPC C++ 异步使用入门

这篇文章将讲述如何使用 gRPC 异步/非阻塞 C++ API 编写一个简单的服务端(server)和客户端(client)。在读取这篇文章之前,你需要先了解 Protocol Buffer 和 gRPC 基础,你可在本博客搜索到它们的相关文章。

这篇文章将围绕 gRPC 的官方例子 Greeter 展开学习,你可在 grpc/examples/protos/helloworld.proto 中查看服务的定义,在 grpc/examples/cpp/helloworld/ 中查看完整代码。

概览

gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:在 RPC 调用上绑定一个 CompletionQueue

做一些事情,例如读或写这样的事情,用一个唯一的 void* 标记表示

调用 CompletionQueue::Next 等待操作完成,如果返回之前的一个标记,则表示对应的操作已经完成。

异步客户端

要使用异步客户端调用远程方法,首先要创建 channel 和 stub,这个过程和 grpc/examples/cpp/helloworld/greeter_client.cc 中差不多(同步示例)。一旦你创建了 stub,就可以做以下事情来进行异步调用了:初始化 RPC 并为其创建句柄。将 RPC 绑定到一个 CompletionQueue 上。CompletionQueue cq;

std::unique_ptr > rpc(

stub_->AsyncSayHello(&context, request, &cq));

使用一个唯一的标记(这里用的是 (void*)1),请求回复和最终状态Status status;

rpc->Finish(&reply, &status, (void*)1);

等待完成队列返回标记(tag)。一旦返回了之前对应 Finish() 函数中传递的标记,应答和状态就可以被返回了。void* got_tag;

bool ok = false;

cq.Next(&got_tag, &ok);

if (ok && got_tag == (void*)1) {

// check reply and status

}

你可以在 grpc/examples/cpp/helloworld/greeter_async_client.cc 中看到完整的客户端示例。

异步服务端

服务器实现一个带有标记(tag)的 RPC 调用请求,然后等待完成队列返回标记。异步处理 RPC 的基本流程是:构建一个 server 并导出异步服务helloworld::Greeter::AsyncService service;

ServerBuilder builder;

builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());

builder.RegisterAsyncService(&service);

auto cq = builder.AddCompletionQueue();

auto server = builder.BuildAndStart();

请求一个 RPC,并提供唯一的标记(tag)ServerContext context;

HelloRequest request;

ServerAsyncResponseWriter responder;

service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);

等待完成队列返回标记。一旦检索到标记,context 、request 和 responder 就准备好了。HelloReply reply;

Status status;

void* got_tag;

bool ok = false;

cq.Next(&got_tag, &ok);

if (ok && got_tag == (void*)1) {

// set reply and status

responder.Finish(reply, status, (void*)2);

}

等待完成队列返回标记。RPC 在标记返回时完成。void* got_tag;

bool ok = false;

cq.Next(&got_tag, &ok);

if (ok && got_tag == (void*)2) {

// clean up

}

但是,这个基本流程没有考虑到服务端同时处理多个请求。要解决这个问题,完整的异步服务端示例使用一个CallData 对象来维护每个 RPC 的状态,并使用该对象的地址作为调用的唯一标记。class CallData {

public:

// Take in the "service" instance (in this case representing an asynchronous

// server) and the completion queue "cq" used for asynchronous communication

// with the gRPC runtime.

CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)

: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {

// Invoke the serving logic right away.

Proceed();

}

void Proceed() {

if (status_ == CREATE) {

// As part of the initial CREATE state, we *request* that the system

// start processing SayHello requests. In this request, "this" acts are

// the tag uniquely identifying the request (so that different CallData

// instances can serve different requests concurrently), in this case

// the memory address of this CallData instance.

service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,

this);

// Make this instance progress to the PROCESS state.

status_ = PROCESS;

} else if (status_ == PROCESS) {

// Spawn a new CallData instance to serve new clients while we process

// the one for this CallData. The instance will deallocate itself as

// part of its FINISH state.

new CallData(service_, cq_);

// The actual processing.

std::string prefix("Hello ");

reply_.set_message(prefix + request_.name());

// And we are done! Let the gRPC runtime know we've finished, using the

// memory address of this instance as the uniquely identifying tag for

// the event.

responder_.Finish(reply_, Status::OK, this);

status_ = FINISH;

} else {

GPR_ASSERT(status_ == FINISH);

// Once in the FINISH state, deallocate ourselves (CallData).

delete this;

}

}

}

为简单起见,服务端对所有事件只使用一个完成队列,并在 HandleRpcs 中运行一个主循环来查询队列:void HandleRpcs() {

// Spawn a new CallData instance to serve new clients.

new CallData(&service_, cq_.get());

void* tag; // uniquely identifies a request.

bool ok;

while (true) {

// Block waiting to read the next event from the completion queue. The

// event is uniquely identified by its tag, which in this case is the

// memory address of a CallData instance.

cq_->Next(&tag, &ok);

GPR_ASSERT(ok);

static_cast(tag)->Proceed();

}

}

关闭服务端

我们在使用一个完成队列来获取异步通知,在服务端被关闭之后,也必须小心地关闭它。

记住,我们在 ServerImpl::Run() 函数中通过运行 cq_ = builder.AddCompletionQueue() 来获得完成队列实例cq_ ,看看 ServerBuilder::AddCompletionQueue 的文档,我们可以看到:… Caller is required to shutdown the server prior to shutting down the returned completion queue.

有关详细信息,请参考 ServerBuilder::AddCompletionQueue 的完整文档字符串。在我们的示例中,ServerImpl 的析构函数如下所示:~ServerImpl() {

server_->Shutdown();

// Always shutdown the completion queue after the server.

cq_->Shutdown();

}

你可以在 grpc/examples/cpp/helloworld/greeter_async_server.cc 中看到完整的服务端示例。

实现多个服务

前面官方提到的示例中只实现了一个 SayHello RPC 服务,如果想要实现多个 RPC服务该怎么办呢?下面的将讲述如何对示例中的代码进行修改,使他再支持一个名为 SayBye 的服务。

这个方法就是为每个 RPC 服务都实现一个不同的 CallData 类。但是,当你从 cq_->Next() 获取标记时,你知道它是指向这些类之一的对象的指针,但是你不知道它的确切类型。

为了克服这个问题,你可以让它们都继承一个具有 virtual Proceed() 成员函数的类,再根据需要在每个子类中实现它,当您获得一个标记时,将其转换为 CallData 并调用 Proceed()。class CallData {

public:

virtual void Proceed() = 0;

};

class HelloCallData final : public CallData {...};

class ByeCallData final : public CallData {...};

...

new HelloCallData(...);

new ByeCallData(...);

cq_->Next(&tag, &ok);

static_cast(tag)->Proceed();

...

多线程

对于如何在多线程中使用异步 RPC API 完成队列,官方的的文档说明是:Right now, the best performance trade-off is having numcpu's threads and one completion queue per thread.

当前,最好的权衡性能的方法是使用创建 cpu 个数的线程数,并在每个线程中都使用一个完成队列。