从源码透析gRPC调用原理
grpc 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持.
gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
本文将解析gRPC的实现逻辑,分别从客户端和服务端来说明gRPC的实现原理。
准备条件
本文将以gRPC Github上 helloword 的代码进行介绍,其文件结构为:
1
2
3
4
greeter_client
greeter_server
helloworld
mock_helloworld
我们只需要关注前三个文件夹的内容。greet_client
和 greet_server
文件中分别是grpc客户端和服务端的业务调用代码,包含了一个标准的gRPC调用过程。 helloworld
中包含了是 protobuf 的协议文件和生成的 helloworld.pb.go
文件(至于pb的协议和*.pb.go
文件的生成等内容,不作为本文的介绍范围,不再赘述)。
服务端
对于Server端,grpc在server端的调用逻辑如下,基本就是分为四步:
- 创建端口监听listener
- 创建server实例
- 注册服务(并未真正开始服务)
- 启动服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
...
// 创建listener
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建server示例
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
reflection.Register(s)
// 启动服务端监听
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
...
}
创建监听端口
创建listener,不用多介绍了,就是创建了一个监听tcp端口的Listener实例。
创建服务端实例
NewServer()
方法创建了一个 grpc.Server
实例,其函数内部会对该实例进行一系列初始化赋值操作。该接口与客户端中的 Dial()
接口类似,可以接受多个 ServerOption
入参,在 helloworld
的示例中并未传入任务参数,一个简单的示例如下:
1
svr := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))
在grpc中,也存在了多种类似于 CustomCodec()
这样返回值类型为 ServerOption
的函数,从而满足调用方在需要求进行传参赋值:
1
2
3
func CustomCodec() ServerOption
func MaxConcurrentStreams() ServerOption
func UnknownServiceHandler() ServerOption
服务注册
RegisterGreeterServer()
是由 helloworld.pb.go
生成的接口,其主要调用了grpc的 RegisterService()
来注册当前service及其实现。
grpc.RegisterService()
接收一个参数类型为 ServiceDesc
的实例 _Greeter_serviceDesc
用以为 service 的描述说明,同时接收一个 service 实例作注册进来。其中 _Greeter_serviceDesc
是由 pb
生成的对业务RPC接口的描述,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// helloworld.pb.go
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}
我们可以看到,在 grpc.ServiceDesc
中对 Methods
变量进行了赋值。其中 Methods
包含了一个RPC接口名到handler的映射数组,描述了当前service支持的所有的方法,MethodName
即为调用的RPC接口名,而handler
的值 _Greeter_SayHello_Handler()
也是由pb生成的方法,在其内部通过注册进来的service实例,实现了对我们的业务函数 SayHello()
进行了调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GreeterServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloworld.Greeter/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
启动服务
Serve
函数中开始接收来到 listener 的请求(实际上也就是对listener进行了 Accept()
),并为每一个请求创建一个go程来服务。
Serve
函数的逻辑判断比较复杂,但其实真正的调用逻辑过程十分简单,在下面列出,从而有助于我们的理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *Server) Serve(lis net.Listener) error {
...
for {
// 开始接受服务
rawConn, err := lis.Accept()
...
// 为每一个请求启动一个go程来处理链接
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
func (s *Server) handleRawConn(rawConn net.Conn) {
// 鉴权操作
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
...
// 基于HTTP2,创建一个ServerTransport
st := s.newHTTP2Transport(conn, authInfo)
...
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}
其中, newHTTP2Transport()
的代码主要部分有一些关于HTTP2的赋值和初始化操作,存在于 internal/transport/http2_server.go
中,这儿就不再进入介绍http2的具体实现方式了。而 serveStreams()
中则主要是调用了 HandleStreams()
接口去真正的接受请求流。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
HandleStreams()
中的实现在 grpc-go/internal/transport/handler_server.go
文件中。
在 HandleStreams()
实现中前面一大部分是对数据流 Stream
的初始化,数据接收以及赋值,详细的处理过程大家可以去文件中详细的看代码,这里我们只做逻辑流程的分析。在数据流 stream
接收完毕后,通过注册进来的server的 startStream()
来处理数据流。
注册进来的 startStream()
最终调用了Server中的 startStream()
函数,区分出是 unary
请求还是 stream
请求,并分别通过 processUnaryRPC()
和 processStreamingRPC()
进行区分处理。对于两个主要的处理函数 processUnaryRPC()
和 processStreamingRPC()
,基本上是一些具体的数据接收、编解码等操作,就不在浪费篇幅贴出代码了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
...
// 数据流Stream的接受和赋值
startStream(s)
ht.runStream()
close(requestOver)
// 等待数据读取完毕
req.Body.Close()
<-readerDone
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
...
// 判断Unary RPC还是Streaming RPC
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
...
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
...
}
最后,简单以一个图示来展示grpc服务端的调用流程:
业务代码实现
在protobuffer 文件中定义了 SayHello
方法, 生成的 go 文件中定义了一个与protobuffer 中一样的接口。真是的业务实现需要一个服务来实现这些接口。并将这个服务注册到server中去。
1
2
3
4
5
6
type server struct{}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
客户端
首先,我们以Github官网上的example为示例来一览gRPC client端的使用,从而跟踪其调用的逻辑个原理。总的来看,调用的过程基本就是分为三步:
- 创建connection
- 创建业务客户端实例
- 调用RPC接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
...
// 创建connection
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建client
c := pb.NewGreeterClient(conn)
// 调用RPC接口
name := defaultName
r, err := c.SayHello(context.TODO(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
...
}
创建connection
通过 grpc.Dial()
接口创建了一个 ClientConn
类型实例。
Dial()
函数的第一个参数作为 endpoint 我们就不多说了,同时 Dial() 还接受变长参数 DialOption
。 DialOption
是一个接口类型,在grpc中存在着多种返回了DialOption类型的函数,这些返回了DialOption类型的函数,例如编解码、负载均衡策略等,一些函数声明示例如下:
1
2
3
func WithBalancer() DialOption
func WithInsecure() DialOption
func WithCodec() DialOption
根据client的需求,调用方在调用Dial()的时候可以将这些函数作为参数传入 Dial()
中。
在 Dial()
中,首先是会根据参数进行一系列的初始化和赋值操作,就不在这里列出说明,而对于这些 DailOption
参数,在Dial()中最终实现对 grpc.ClientConn
的成员变量dopts中的 CallOption
进行了赋值。
通过Dial()的调用,grpc已经建立了到服务端的链接,同时也会附带一些诸如负载均衡、证书检查、Backoff等策略的执行(如果有进行配置的话)。
创建客户端实例
创建业务client实例,在使用gRPC的时候,我们都知道其传递协议是 protobuf
。
而 NewGreeterClient()
则是通过对pb协议生成的代码接口,存在于 helloworld.pb.go
中,该函数主要是返回了一个 greeterClient
类型的实例。
调用RPC请求
SayHello()
中的RPC接口也是存在于根据pb协议生成的 helloworld.pb.go
文件中。
SayHello()
除了接受一个context
存储上下文信息和一个request类型参数,同时也支持一个 CallOption
类型的变量。关于 CallOption
在上文中有提到,其本身也是一个接口,其中 before()
用于在请求发送之前设置参数,而 after()
则是在请求调用完毕之后提取信息。通过对这两个函数的调用,方便的实现了在请求前后的一些参数设置的功能:
1
2
3
4
type CallOption interface {
before(*callInfo) error
after(*callInfo)
}
任何一个我们我们上文说到了返回值为 DialOption
的函数,大部分都有一个对应的结构实现了 CallOption
,诸如上面的 WithCodec()
,其对应的结构为:
1
2
3
4
5
6
7
8
9
type CustomCodecCallOption struct {
Codec Codec
}
func (o CustomCodecCallOption) before(c *callInfo) error {
c.codec = o.Codec
return nil
}
func (o CustomCodecCallOption) after(c *callInfo) {}
回到 SayHello()
函数的逻辑中来,该函数最终会调用grpc中的 call.go
中的 invoke
函数来执行具体的操作。
在 invoke()
函数中,newClientStream()
会首先获取传输层Trasport结构的实例并包装到一个 ClientStream
实例中返回,随后将RPC请求通过 SendMsg()
接口发送出去,注意,由于 SendMsg()
并不会等待服务端收到数据,因此还需要通过 RecvMsg()
同步接收收到的回复消息(关于SendMsg()和RecvMsg()中的具体发送和接收数据逻辑,不在赘述,可以去源码再详细了解)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pb.go文件
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
...
// grpc/grpc.go/call.go文件
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}