在上一节搭完分布式追踪的采集展示链路后,这一节开始分析分析分布式链路追踪的核心源码。我们知道分布式追踪的原理是通过traceId串联调用链路上的所有服务和日志,每个服务都有一个自己的spanId,每一次rpc调用都需要生成一个子spanId,通过父子spanID的对应关系,构建一个有向无环图实现分布式追踪的。因此在业务代码的接入过程中需要实现如下功能,父子span关系的构建,父子span关系的传递(包括context内部传递和rpc服务之间的传递有可能跨协议比如http和grpc协议之间传递),rpc日志的采样,上报等等。每一个厂商都有自己的实现,opentrace定义了统一的标准接口,我们按照标准实现即可。在业务代码中实现包括四步:
1,定义tracer,包括采样配置和agent上报相关的配置,然后放入全局变量中。
2,服务端响应请求的时候解析传入的trace,放入context
3,发起下游调用的时候序列化trace,传递给下游
4,对于业务日志需要串联trace的地方,我们打印带context的日志,从context中提取trace和当前span的信息。
下面我们结合golang源码看下实现
代码语言:javascript复制func main() {
tracer, closer, err := middleware.NewTracer("rootTracerExample", "127.0.0.1:6831", false)
defer closer.Close()
if err != nil {
panic(err)
}
opentracing.SetGlobalTracer(tracer)
go grpc.Main()
http.Main()
}
定义tracer
代码语言:javascript复制import (
"io"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
var Tracer opentracing.Tracer
// NewTracer 创建一个jaeger Tracer
func NewTracer(servicename string, addr string, udp bool) (opentracing.Tracer, io.Closer, error) {
cfg := jaegercfg.Configuration{
ServiceName: servicename,
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst, //固定采样
Param: 1, //1全采样,0不采样
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
BufferFlushInterval: 1 * time.Second,
LocalAgentHostPort: addr, //"127.0.0.1:6831",
},
}
sender, err := jaeger.NewUDPTransport(addr, 0)
if err != nil {
return nil, nil, err
}
if udp {
reporter := jaeger.NewRemoteReporter(sender)
// Initialize tracer with a logger and a metrics factory
return cfg.NewTracer(
jaegercfg.Reporter(reporter),
)
}
return cfg.NewTracer()
}
为了演示完整效果,我们定义一个http服务和一个grpc服务,完成http调http http调grpc grpc调grpc
代码语言:javascript复制syntax = "proto3";
package test;
option go_package = "learn/Jaeger/exp1/grpc";
//定义服务
service TestService {
//注意:这里是returns 不是return
rpc SayHello(Request) returns (Response){
}
rpc SayHello1(Request) returns (Response){
}
}
//定义参数类型
message Request {
string message=1;
}
message Response {
string message=1;
}
生成下代码
代码语言:javascript复制% protoc --go-grpc_out=. learn/Jaeger/exp1/grpc/hello.proto
% protoc --go_out=. learn/Jaeger/exp1/grpc/hello.proto
定义grpc的服务端代码
代码语言:javascript复制package grpc
import (
context "context"
"fmt"
"learn/learn/Jaeger/middleware"
"log"
"net"
grpc "google.golang.org/grpc"
)
func Main() {
srv := grpc.NewServer(grpc.UnaryInterceptor(middleware.TraceSpanServerInterceptor()))
RegisterTestServiceServer(srv, &HelloService{})
listener, err := net.Listen("tcp", ":8081")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
err = srv.Serve(listener)
if err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
type HelloService struct {
}
func (s *HelloService) mustEmbedUnimplementedTestServiceServer() {}
func (s *HelloService) SayHello(ctx context.Context, r *Request) (*Response, error) {
conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := NewTestServiceClient(conn)
client.SayHello1(ctx, r)
fmt.Println("SayHello", ctx)
return &Response{}, nil
}
func (s *HelloService) SayHello1(ctx context.Context, r *Request) (*Response, error) {
fmt.Println("SayHello1", ctx)
return &Response{}, nil
}
实现一个简单的http服务
代码语言:javascript复制package http
import (
"fmt"
"log"
"net/http"
"learn/learn/Jaeger/middleware"
mygrpc "learn/learn/Jaeger/exp1/grpc"
"google.golang.org/grpc"
)
func Main() {
mutx := http.NewServeMux()
mutx.HandleFunc("/request1", request1)
mutx.HandleFunc("/request2", request2)
http.ListenAndServe(":8080", middleware.ServerTraceSpan(mutx))
}
func request1(w http.ResponseWriter, r *http.Request) {
url := "http://localhost:8080/request2"
bytes, err := middleware.ClientTraceSpan(r.Context(), "GET", url, nil)
if err != nil {
fmt.Fprint(w, err.Error())
}
fmt.Fprint(w, string(bytes))
fmt.Println("request1", r.Context())
}
func request2(w http.ResponseWriter, r *http.Request) {
conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := mygrpc.NewTestServiceClient(conn)
client.SayHello(r.Context(), &mygrpc.Request{})
fmt.Println("request2", r.Context())
}
我们通过middleware的方式实现trace的传递,对于gprc服务
代码语言:javascript复制package middleware
import (
"context"
"encoding/base64"
"fmt"
"strings"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/siddontang/go/log"
"github.com/uber/jaeger-client-go"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
//"example/constants"
)
// TraceSpanClientInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.Dial() call.
//
// For example:
//
// conn, err := grpc.Dial(
// address,
// ..., // (existing DialOptions)
// grpc.WithUnaryInterceptor(rpc.TraceSpanClientInterceptor()),
// )
//
// It writes current trace span to request metadata.
func TraceSpanClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "RPC Client " method)
defer span.Finish()
// Save current span context.
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
if err = opentracing.GlobalTracer().Inject(
span.Context(), opentracing.HTTPHeaders, metadataTextMap(md),
); err != nil {
log.Errorf("Failed to inject trace span: %v", err)
}
return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
}
}
// TraceSpanServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// For example:
//
// s := grpc.NewServer(
// ..., // (existing ServerOptions)
// grpc.UnaryInterceptor(rpc.TraceSpanServerInterceptor()),
// )
//
// It reads current trace span from request metadata.
func TraceSpanServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (resp interface{}, err error) {
// Extract parent trace span.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
parentSpanContext, err := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders, metadataTextMap(md),
)
switch err {
case nil:
case opentracing.ErrSpanContextNotFound:
log.Info(ctx, "Parent span not found, will start new one.")
default:
log.Errorf("Failed to extract trace span: %v", err)
}
// Start new trace span.
span := opentracing.StartSpan(
"RPC Server " info.FullMethod,
ext.RPCServerOption(parentSpanContext),
)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
// Set request ID for context.
if sc, ok := span.Context().(jaeger.SpanContext); ok {
ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
}
return handler(ctx, req)
}
}
const (
binHeaderSuffix = "_bin"
)
// metadataTextMap extends a metadata.MD to be an opentracing textmap
type metadataTextMap metadata.MD
// Set is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) Set(key, val string) {
// gRPC allows for complex binary values to be written.
encodedKey, encodedVal := encodeKeyValue(key, val)
// The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
// we just override.
m[encodedKey] = []string{encodedVal}
}
// ForeachKey is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
for k, vv := range m {
for _, v := range vv {
if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
if err = callback(decodedKey, decodedVal); err != nil {
return err
}
} else {
return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
}
}
}
return nil
}
// encodeKeyValue encodes key and value qualified for transmission via gRPC.
// note: copy pasted from private values of grpc.metadata
func encodeKeyValue(k, v string) (string, string) {
k = strings.ToLower(k)
if strings.HasSuffix(k, binHeaderSuffix) {
val := base64.StdEncoding.EncodeToString([]byte(v))
v = string(val)
}
return k, v
}
由于官方默认包里只实现了bin,kv和httpHeader三种格式的carrier,因此对于grpc服务需要自己实现carrier。对于http服务实现如下
代码语言:javascript复制package middleware
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-client-go"
)
const TraceHeader = "Http-TraceHeader"
func ClientTraceSpan(ctx context.Context, method, url string, body io.Reader) (resBody []byte, err error) {
client := &http.Client{}
req, err := http.NewRequest(method, url, body)
if err != nil {
panic(err)
}
span, _ := opentracing.StartSpanFromContext(ctx, TraceHeader)
defer span.Finish()
ext.SpanKindRPCClient.Set(span)
ext.HTTPUrl.Set(span, url)
ext.HTTPMethod.Set(span, "GET")
span.Tracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
ctx = opentracing.ContextWithSpan(ctx, span)
req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
fmt.Println("请求错误:", err)
}
return ioutil.ReadAll(resp.Body)
}
//中间件
func ServerTraceSpan(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tracer := opentracing.GlobalTracer()
// 从ctx获取span
if parent := opentracing.SpanFromContext(r.Context()); parent != nil {
parentCtx := parent.Context()
// 获取opentracing中的全局tracer
if tracer := opentracing.GlobalTracer(); tracer != nil {
mySpan := tracer.StartSpan("my info", opentracing.ChildOf(parentCtx))
// 由于前面opentracing中的tracer是jaeger的,所以你这里转化为jaeger.SpanContext
if sc, ok := mySpan.Context().(jaeger.SpanContext); ok {
// 这里,就能获取traceid等信息了,可以放在日志里
w.Header().Set(TraceHeader, sc.TraceID().String())
}
defer mySpan.Finish()
}
}
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
span := opentracing.StartSpan(
"RPC Server " r.RequestURI,
ext.RPCServerOption(spanCtx),
)
defer span.Finish()
ctx := opentracing.ContextWithSpan(r.Context(), span)
if sc, ok := span.Context().(jaeger.SpanContext); ok {
ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
w.Header().Set(TraceHeader, sc.TraceID().String())
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
为了方便测试可以把traceID单独提出来放入httpheader里面,测试下
代码语言:javascript复制% curl -vi http://127.0.0.1:8080/request1
* Trying 127.0.0.1:8080...
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> GET /request1 HTTP/1.1
> Host: 127.0.0.1:8080
> User-Agent: curl/7.79.1
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Http-Traceheader: 73f6efd73f361c12
Http-Traceheader: 73f6efd73f361c12
< Date: Sun, 23 Oct 2022 19:46:28 GMT
Date: Sun, 23 Oct 2022 19:46:28 GMT
< Content-Length: 0
Content-Length: 0
<
* Connection #0 to host 127.0.0.1 left intact
效果如下
当然上述实现还是很粗糙的,比如为了方便使用默认的contextKey 传递trace信息,没有实现自定义的Extract和Inject方法,导致client和server各打印了一份trace信息。下一期在源码实现分析的时候介绍如何优化。