概述
在分布式链路追踪等场景下,会使用到微服务调用链路上的透传能力,tRPC-Go 基于 tRPC 协议的头部设计实现了对链路透传的支持,这篇文章从源码角度分析链路透传的设计实现,文章中会涉及 tRPC-go 里不同场景中如何正确使用链路透传功能。
说明
- 本文基于以下源码以及版本 trpc-go: v0.9.4 (截止本文编写时的最新发布版本)
- 所有 tRPC-Go 源码均在文章中提供了链接,可以点击链接直达工蜂仓库。
源码走读
先从应用层调用讲起
一般而言,一个典型的 tRPC-go 的 RPC 调用起点类似如下代码:
代码语言:go复制proxy := pb.NewOrderClientProxy()
rsp, err := proxy.PlaceOrder(ctx, req)
这里的工厂函数和方法都是我们用 trpc create
命令生成的桩代码里提供的实现,下一小节我们会看到桩代码里的实现。
另一方面,按照官方文档的说法,从主调向被调传递透传信息的话,需要使用 client.WithMetaData
传递选项,假如我们需要向 PlaceOrder
传递一个 requestID,值为 123456
:
options := []client.Option{
client.WithMetaData("requestID", []byte("123456")),
}
rsp, err := proxy.PlaceOrder(ctx, req, options...)
而假设被调的 PlaceOrder
方法会反过来透传一个流控信息,ExceedRateLimit
为 "true"
,则主调还需要显式设定一个协议头对象指针:
head := &trpc.ResponseProtocol{}
options := []client.Option{
client.WithMetaData("requestID", []byte("123456")),
client.WithRspHead(head),
}
rsp, err := proxy.PlaceOrder(ctx, req, options...)
if err == nil {
fmt.Println(head.TransInfo) // 一个包含 key 为 ExceedRateLimit 的字典,类型为 map[string][]byte
}
记住这段代码,这是我们后面揭秘的基础,我们会研究这一段代码背后工作的原理。
从 tRPC-go 桩代码着手
我们重点看看方法桩里边的代码:
代码语言:go复制func (c *OrderClientProxyImpl) PlaceOrder(ctx context.Context, req *EmptyMessage, opts ...client.Option) (rsp *EmptyMessage, err error) {
// 这里通过 ctx 派生出新的 ctx 和消息,后面会有源码解读
ctx, msg := codec.WithCloneMessage(ctx)
// 这里会对派生出的消息进行被调信息设置,上面的 `WithCloneMessage` 会负责设置消息里的主调信息
msg.WithClientRPCName("/trpc.example.orders_svr.Order/PlaceOrder")
msg.WithCalleeServiceName(OrderServer_ServiceDesc.ServiceName)
msg.WithCalleeApp("example")
msg.WithCalleeServer("orders_svr")
msg.WithCalleeService("Order")
msg.WithCalleeMethod("PlaceOrder")
msg.WithSerializationType(codec.SerializationTypePB)
rsp = &EmptyMessage{}
err = c.client.Invoke(ctx, req, rsp, callopts...)
// 一些收尾工作,不相关,跳过
}
可以看到,每个 RPC 方法桩的实现里,其实都是标准的过程:
- 通过
codec.WithCloneMessage(ctx)
派生一组新的 context 和 message 实例; - 设置 message 的被调信息,被调信息用于指明服务发现的目标以及 RPC 被调的服务名和方法名;
- 合并 opts 选项信息;
- 通过
c.client.Invoke
方法发起实际的远程调用。
我们只想关注链路透传过程,所以我们后面重点关注 codec.WithCloneMessage
函数和 c.clientInvoke
方法的逻辑。
codec.WithCloneMessage
的作用
我们进一步检查 codec.WithCloneMessage
的源码:
func WithCloneMessage(ctx context.Context) (context.Context, Msg) {
newMsg := msgPool.Get().(*msg) // 从 sync.Pool 实例 msgPool 获取一个池化指针,这种写法是为了减少对象创建和销毁的开销
val := ctx.Value(ContextKeyMessage) // 从 ctx 对象中获取其绑定的 *codec.Message 指针,一般是当前服务收到的一个请求的消息的指针
m, ok := val.(*msg)
if !ok { // 下面 3 行,是在传入的 ctx 不是一个合法的 tRPC context 的情况下,直接使用全新的 *codec.Message
ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
newMsg.context = ctx
return ctx, newMsg
}
// 以下两行,将新的 ctx 和新的消息指针 newMsg 相互绑定,后续业务逻辑里,使用 ctx 便可以找到绑定的消息指针
ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
newMsg.context = ctx
// 以下两行,从原有的消息指针指向的对象上拷贝部分值到新的消息指针指向的内存上
copyCommonMessage(m, newMsg)
copyServerToClientMessage(m, newMsg)
return ctx, newMsg
}
如代码注释说明,WithCloneMessage
实现一组新的 ctx
和 msg
的派生,消息派生过程中,具体拷贝了哪些信息呢?我们接着看 copyCommonMessage
和 copyServerToClientMessage
。
先看 copyCommonMessage
:
func copyCommonMessage(m *msg, newMsg *msg) {
newMsg.frameHead = m.frameHead
newMsg.requestTimeout = m.requestTimeout
newMsg.serializationType = m.serializationType
newMsg.serverRPCName = m.serverRPCName
newMsg.clientRPCName = m.clientRPCName
newMsg.serverReqHead = m.serverReqHead
newMsg.serverRspHead = m.serverRspHead
newMsg.dyeing = m.dyeing
newMsg.dyeingKey = m.dyeingKey
newMsg.serverMetaData = m.serverMetaData
newMsg.logger = m.logger
newMsg.namespace = m.namespace
newMsg.envName = m.envName
newMsg.setName = m.setName
newMsg.envTransfer = m.envTransfer
newMsg.commonMeta = m.commonMeta.Clone()
}
可以看到,copyCommonMessage
主要是拷贝了这些信息:
- 请求超时时间
- RPC 请求和响应各自的名称和头部
serverMetaData
:这个是当前服务收到的请求中由主调方传递到被调服务方的链路透传信息- 其他一些框架定义的上下文信息,比如
namespace
和envName
等。
接着再看 copyServerToClientMessage
:
func copyServerToClientMessage(m *msg, newMsg *msg) {
// 将原消息的服务端收到的链路透传消息复制给新消息的客户端链路透传
// 也就是说,将当前服务收到的链路透传进一步传递给下一个服务
newMsg.clientMetaData = m.serverMetaData.Clone()
// 一般需要拷贝消息的场景都是因为当前服务也需要 RPC 调用,所以从当前服务收到的消息派生
// RPC 调用消息的话,需要更正主调信息为自己的服务信息
newMsg.callerServiceName = m.calleeServiceName
newMsg.callerApp = m.calleeApp
newMsg.callerServer = m.calleeServer
newMsg.callerService = m.calleeService
newMsg.callerMethod = m.calleeMethod
}
如上,到这里,一个派生的消息就从一个现有消息中拷贝了框架定义的一些上下文信息了,并且也设置了主调信息为自身服务,但是,被调信息呢?你回到开头看 tRPC 的桩代码就会知道了,桩代码里会负责进一步重写被调信息。
抬头看看 c.client.Invoke
方法
了解完消息派生,再来看看 RPC 调用请求的核心过程,c.client
是一个 client.Client
接口类型的对象,考虑到默认的 tRPC 协议请求的话,使用的是框架默认的 client
实现,我们看看它的 Invoke
方法:
func (c *client) Invoke(ctx context.Context, reqbody interface{}, rspbody interface{}, opt ...Option) error {
// 此处跳过一些不相关的逻辑
// start filter chain processing
return c.getFilters(opts).Filter(contextWithOptions(ctx, opts), reqbody, rspbody, callFunc)
}
Invoke
方法主要初始化请求的 opt 合并以及超时控制,核心是通过过滤器链的 Filter
方法开始逐层处理逻辑,而真正的请求处理逻辑则定义在 callFunc
中:
func callFunc(ctx context.Context, reqbody interface{}, rspbody interface{}) error {
msg := codec.Message(ctx)
// 这里省略一些无关逻辑
// 这里开始消息编码
reqbuf, err := prepareRequestBuf(msg, reqbody, opts)
if err != nil {
return err
}
// 这里开始底层的连接和数据传输
rspbuf, err := opts.Transport.RoundTrip(ctx, reqbuf, opts.CallOptions...)
var rspbodybuf []byte
if opts.EnableMultiplexed {
rspbodybuf = rspbuf
} else {
// 消息解码,将收到的二进制解码到标准消息结构
rspbodybuf, err = opts.Codec.Decode(msg, rspbuf)
if err != nil {
return errs.NewFrameError(errs.RetClientDecodeFail, "client codec Decode: " err.Error())
}
}
// 进一步做反序列化等操作
return processResponseBuf(msg, rspbody, rspbodybuf, opts)
}
这里涉及几个关键操作,我们下来逐个展开。
prepareRequestBuf
的逻辑
以下是 prepareRequestBuf
的源码:
func prepareRequestBuf(msg codec.Msg, reqbody interface{}, opts *Options) ([]byte, error) {
// 序列化和压缩
reqbodybuf, err := serializeAndCompress(msg, reqbody, opts)
if err != nil {
return nil, err
}
// 将序列化和压缩后的数据进一步按照传输协议编码
reqbuf, err := opts.Codec.Encode(msg, reqbodybuf)
if err != nil {
return nil, errs.NewFrameError(errs.RetClientEncodeFail, "client codec Encode: " err.Error())
}
return reqbuf, nil
}
序列化和压缩的逻辑在此不展开,因为不涉及我们要讨论的链路透传中的头部的处理,我们看 opts.Codec.Encode
的逻辑,它实现了对消息的编码。一般情况下,我们使用标准的 tRPC 编解码协议的话,使用的是框架默认的编解码器,我们看它的 Encode
方法:
func (c *ClientCodec) Encode(msg codec.Msg, reqbody []byte) (reqbuf []byte, err error) {
frameHead := getFrameHead(msg) // 从消息元数据中获取帧信息,这个取决于具体协议,这里因为是默认实现,所以这里的帧头信息是 tRPC 的默认实现
if frameHead.FrameType != uint8(TrpcDataFrameType_TRPC_UNARY_FRAME) {
return c.streamCodec.Encode(msg, reqbody)
}
// 检查客户端应用层调用 RPC 时是否显式设置了请求头信息,有则使用其指定的,没有则使用默认头:
// Version: uint32(TrpcProtoVersion_TRPC_PROTO_V1)
// CallType: uint32(TrpcCallType_TRPC_UNARY_CALL),
req, err := getRequestHead(msg)
if err != nil {
return nil, err
}
// 从 msg 中提取请求所需的主被调信息、链路透传信息等
c.updateReqHead(req, msg)
// 协议升级到具体通信协议
upgradeProtocol(frameHead, msg, req.RequestId)
// 使用 protobuf 对请求头进行编码
reqhead, err := proto.Marshal(req)
if err != nil {
return nil, err
}
// 将请求协议头与正文进行组装,形成完整的一次请求的编码
return frameHead.construct(reqhead, reqbody)
}
这里的编码过程也比较清晰了,我们重点看看 c.updateReqHead(req, msg)
就好:
func (c *ClientCodec) updateReqHead(req *RequestProtocol, msg codec.Msg) {
// 此处省略一堆与本文无关的源码
req.TransInfo = setClientTransInfo(msg, req.TransInfo)
}
这里就是请求的链路透传数据了,setClientTransInfo
负责将 msg 的链路透传信息复制到 req.TransInfo 中:
func setClientTransInfo(msg codec.Msg, trans map[string][]byte) map[string][]byte {
// set MetaData
if len(msg.ClientMetaData()) > 0 {
if trans == nil {
trans = make(map[string][]byte)
}
for k, v := range msg.ClientMetaData() {
trans[k] = v
}
}
// 此处省略后续的一些额外的 transInfo 操作
return trans
}
那么 msg
的 ClientMetaData
是怎么来的?还记得开头示例代码里的 client.WithMetaData("requestID", []byte("123456"))
吗?知道了吧,这个问题留给你自己探索一下。
opts.Codec.Decode
的逻辑
opts.Codec.Decode
方法调用发生在 RoundTrip 之后,也就是网络往返完成之后,此时客户端已经收到服务器端返回的完整响应了,我们看看它需要做什么事情:
func (c *ClientCodec) Decode(msg codec.Msg, rspbuf []byte) (rspbody []byte, err error) {
// 此处省略一些无关代码
// 获取响应头信息
rsp, err := c.getResponseHead(msg)
if err != nil {
return nil, err
}
// 此处省略一些代码:主要完成对头部信息二进制片段的截取
// 将头部信息进行反序列化
if err := proto.Unmarshal(rspbuf[begin:end], rsp); err != nil {
return nil, err
}
// 使用响应头信息更
if err := updateMsg(msg, frameHead, rsp); err != nil {
return nil, err
}
// body decoded
return rspbuf[end:], nil
}
先看 c.getResponseHead
的代码,它会在应用层有显式设定的情况下返回应用层设定的头部信息的指针:
func (c *ClientCodec) getResponseHead(msg codec.Msg) (*ResponseProtocol, error) {
if msg.ClientRspHead() != nil {
// 这里不为空的话,意味着应用层执行了 client.WithRspHead(head) 设置了响应头,这种情况下就用
// 应用层指定的指针反序列化头部,这样应用层就可以获取到响应头部了,应用层可以进一步使用 head.TransInfo 获取链路透传信息
rsp, ok := msg.ClientRspHead().(*ResponseProtocol)
if !ok {
return nil, errors.New("client decode rsp head type invalid, must be trpc response protocol head")
}
return rsp, nil
}
// 没有指定的情况下,用默认的新的头部信息指针
rsp := &ResponseProtocol{}
msg.WithClientRspHead(rsp)
return rsp, nil
}
所以,我们在文章开头示例代码中通过 client.WithRspHead(head)
设定的 head 会在此时被 getResponseHead
返回。
接下来看 updateMsg(msg, frameHead, rsp)
:
func updateMsg(msg codec.Msg, frameHead *FrameHead, rsp *ResponseProtocol) error {
// 此处省略少量无关代码
// 这里将被调服务端透传回来的信息合并到 ClientMetaData 中
if len(rsp.TransInfo) > 0 {
md := msg.ClientMetaData()
if len(md) == 0 {
md = codec.MetaData{}
}
for k, v := range rsp.TransInfo {
md[k] = v
}
msg.WithClientMetaData(md)
}
// 此处省略一些无关代码
}
可以看到,这里其中的一个有意思的事情是会将服务端链路透传信息合并到请求时的消息的 ClientMetaData
中,如果你能访问到这个消息的指针,也就能够有第二种方式获取到服务端返回的链路透传信息了。但是显然你在应用层是不能的,为什么呢?因为请求使用的 msg 是在桩代码中派生的,不是你应用层提供的。所以,应用层想要获取链路回传信息,只有官方文档上的那一条路。但是为什么我要提有第二种可能性呢?那是因为你可以在 filter 里访问到这个 message,怎么访问?想想 trpc.Message(ctx)
。
总结
到这里,涉及链路往返透传的相关源码就剖析完整了,用一个流程图结束本文: