谈谈 gRPC 的 C++ 异步编程

  这篇文章我们会介绍 gRPC 的异步编程,如果读者没有使用过 gRPC 的经验,可以先阅读我之前写过的 gRPC 编程指南

异步 Client

  gRPC 支持同步和异步两种编程方式。同步编程理解起来比较容易,譬如说,当一个同步的 client 调用 server 的某个方法时,它会一直处于阻塞状态,等待 server 发送回响应。
  同步的代码编写起来也很简单,和一般的函数调用差不多:

1
2
// 调用 server 的 SayHello 方法,阻塞中
Status status = stub_->SayHello(&context, request, &reply);

  然而对于同步的 client 来说,由于调用远程方法时会阻塞当前线程,所以它无法同时发送多个请求。如果需要同时发送多个请求,那么只能每次发送请求时都在新的线程中发送,而频繁地创建线程会带来不小的开销。
  异步 client 成功地解决了这两个问题:它允许同时发送多个请求,并且每次发送请求时都不需要另外创建线程。异步 client 的解决思路很简单:

  • gRPC 本身提供了异步 API,譬如说我们想调用 server 的SayHello()方法,我们可以调用它的异步版本AsyncSayHello(),调用之后会立即返回,不会阻塞。
  • 当方法调用成功时,我们可以让 gRPC 自动将返回结果放到一个CompletionQueue的队列中(CompletionQueue是一个阻塞队列,并且是线程安全的)。
  • 程序可以启动一个线程,这个线程要做的事,就是不断地从队列中取出结果并处理。

  相比于同步的 client 来说,异步的 client 会稍显复杂。完整的代码可以看这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 记录每个 AsyncSayHello 调用的信息
struct AsyncClientCall {
HelloReply reply;
ClientContext context;
Status status;
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
};
class GreeterClient
{
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
void SayHello(const std::string& user)
{
HelloRequest request;
request.set_name(user);
AsyncClientCall* call = new AsyncClientCall;
// 异步调用,非阻塞
call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_);
// 当 RPC 调用结束时,让 gRPC 自动将返回结果填充到 AsyncClientCall 中
// 并将 AsyncClientCall 的地址加入到队列中
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
void AsyncCompleteRpc()
{
void* got_tag;
bool ok = false;
// 从队列中取出 AsyncClientCall 的地址,会阻塞
while (cq_.Next(&got_tag, &ok))
{
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
delete call; // 销毁对象
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
CompletionQueue cq_; // 队列
};
int main()
{
auto channel = grpc::CreateChannel("localhost:5000", grpc::InsecureChannelCredentials());
GreeterClient greeter(channel);
// 启动新线程,从队列中取出结果并处理
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
auto user = std::string("hello-world-") + std::to_string(i);
greeter.SayHello(user);
}
return 0;
}

参考资料