spring-cloud-sleuth源码学习二

2021-10-11 18:06:58 浏览数 (1)

文章分三部分:

- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)

- zipkin-brave的demo及源码

- spring-cloud-sleuth源码(https://cloud.tencent.com/developer/article/1886833)

zipkin-brave源码梳理

spring-cloud-sleuth的quick-start 上手极快 ; 但是看代码的时候有点懵逼,所以就先对brave进行梳理,梳理后再看spring-cloud对zipkin的整合,瞬间清晰了

测试类全部依赖于Brave-quickstart, 用于熟悉下api

note: 只是对brave是如何进行日志链路追踪的进行梳理,包括spring-cloud-sleuth的源码也只梳理相关类

Brave-quickstart:

  • https://github.com/openzipkin/brave/tree/master/brave

测试的依赖

代码语言:txt复制
    <properties>
        <spring-cloud.version>2020.0.3</spring-cloud.version>
        <zipkin-reporter.version>2.16.3</zipkin-reporter.version>
    </properties>

     
     <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
            <version>${zipkin-reporter.version}</version>
        </dependency>

        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-sender-okhttp3</artifactId>
        </dependency>

测试类

tracker生成类

代码语言:txt复制
public class GlobalContext {

    static OkHttpSender sender;

    static AsyncZipkinSpanHandler spanHandler;

    static Tracing tracing;

    static{
        //配置一个提交器, 控制提交span
        sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
        spanHandler = AsyncZipkinSpanHandler.create(sender);
        tracing = Tracing.newBuilder()
                .localServiceName("trace-demo")
                .addSpanHandler(spanHandler)
                .build();
    }

    public static Tracer tracer() {
        return tracing.tracer();
    }

    public static void close() {
        tracing.close();
        spanHandler.close();
        sender.close();
    }

测试1

代码语言:txt复制
  Tracer tracer = GlobalContext.tracer();
        //root span
        Span root = tracer.newTrace().name("root");
        logger.info("[root]");

        Span root_sub_first = tracer.newChild(root.context()).name("root_sub_first");
        logger.info("[root_sub_first]");

        Span root_sub_second = tracer.newChild(root.context()).name("root_sub_second").start();
        logger.info("[root_sub_second]");

        Span root_sub_sub_first = tracer.newChild(root_sub_first.context()).name("root_sub_sub_first").start();
        logger.info("[root_sub_sub_first]");

        root_sub_sub_first.finish();
        root_sub_first.finish();
        root_sub_second.finish();
        root.finish();

        GlobalContext.close();
        Thread.currentThread().join();

测试2

代码语言:txt复制
 private static Logger logger = LoggerFactory.getLogger(TagDemo.class);
    public static void main(String[] args) throws InterruptedException {
//        method1();
        method2();
        Thread.currentThread().join();
    }

    /**
     * 一个简单的全局增长统计
     * @throws InterruptedException
     */
    private static void method2() throws InterruptedException {
        Tracer tracer = GlobalContext.tracer();
        Span start = tracer.nextSpan().name("tag-test-02").start();
        try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
            logger.info("tag-test-02");
            SUMMARY_TAG.tag(Summarizer.summarizer, start);


            Span span = tracer.nextSpan().name("tag-test-02-sub").start();
            SUMMARY_TAG.tag(Summarizer.summarizer, span);
            logger.info("tag-test-02-sub");
            span.finish();

        }finally {
            start.finish();
        }

    }


    /**
     * 在zipkin客户端可以通过    tagQuery=client/finagle.version   来进行查询的过滤
     *
     * @throws InterruptedException
     */
    private static void method1() throws InterruptedException {
        Tracer tracer = GlobalContext.tracer();
        Span start = tracer.nextSpan().name("tag-test").start();
        try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
            start.tag("client/finagle.version", "6.36.0");
            logger.info("tag-test");
        }finally {
            start.finish();
        }

    }

    static class Summarizer{
        static Summarizer summarizer = new Summarizer();
        private int sum = 0;

        public int increase() {
            return   sum;
        }
    }

    static Tag SUMMARY_TAG = new Tag<Summarizer>("abc") {
        @Override
        protected String parseValue(Summarizer input, TraceContext context) {
            return input.increase()   "次";
        }
    };

Tracing

提供链路装配的必要功能,

代码语言:txt复制
  static final class Default extends Tracing {
    //链路追踪器; Spring中这个是一个单例的bean
    final Tracer tracer;
    //模式是   B3Propagation.FACTORY
    final Propagation.Factory propagationFactory;
    //上一个factory产生的实际传播对象
    final Propagation<String> stringPropagation;
    //默认是: brave.propagation.CurrentTraceContext.Default类, 里面有一个 InheritableThreadLocal 的静态字段
    final CurrentTraceContext currentTraceContext;
    //取样器
    final Sampler sampler;
    final Clock clock;
    //Tag的子类,name=error , 比如异常了把error转成指定格式传给zipkin服务端
    final ErrorParser errorParser;
    final AtomicBoolean noop;
  }

Tracer

链路追踪器, 链路追踪中重要的Span就是由该类来进行创建, 按照官方demo来中使用到#newTracer#nextSpan #newChild(TraceContext)底层全部是通过_toSpan来生成的span

代码语言:txt复制
public class Tracer {
  //根据不同的jdk版本来获取对应的Clock; 比如
  final Clock clock;
  //传播工厂,生成的传播类在不同的调用中增加调用过程中的附加信息
  final Propagation.Factory propagationFactory;
  //虽然注释用来调用tostring方法的,但也只是在这个类里做这个事情, 这个类是有其他功能的
  final SpanHandler spanHandler; // only for toString
  //所有挂起的span
  final PendingSpans pendingSpans;
  //取样器
  final Sampler sampler;
  //当前追踪的上下文,用于整合其他上下文,如mdc
  final CurrentTraceContext currentTraceContext;
  final boolean traceId128Bit, supportsJoin, alwaysSampleLocal;
  final AtomicBoolean noop;
}

#newTrace()

代码语言:txt复制
public Span newTrace() {
    return _toSpan(null, newRootContext(0));
}

TraceContext newRootContext(int flags) {
    flags &= ~FLAG_SHARED; // cannot be shared if we aren't reusing the span ID
    // tracer中有一个propagationFactory类,创建上下文的时候可以扩展一下
    return decorateContext(flags, 0L, 0L, 0L, 0L, 0L, Collections.emptyList());
}

newChild(TraceContext)

代码语言:txt复制
 public Span newChild(TraceContext parent) {
    if (parent == null) throw new NullPointerException("parent == null");
    return _toSpan(parent, decorateContext(parent, parent.spanId()));
  }

nextSpan()

代码语言:txt复制
 //通过线程变量来获取当前链路追踪的上下文; 如果有就以这个上下文为root创建子span ; 没有就创建一个rootspan再根据这个rootspan创建子span
 public Span nextSpan() {
    TraceContext parent = currentTraceContext.get();
    return parent != null ? newChild(parent) : newTrace();
  }

_toSpan()

代码语言:txt复制
Span _toSpan(@Nullable TraceContext parent, TraceContext context) {
    if (isNoop(context)) return new NoopSpan(context);
    //pendingSpans携带了所有挂起的span ; getOrCreate或将创建好的span保存到map中去, 并且SpanHandle处理begin信号
    PendingSpan pendingSpan = pendingSpans.getOrCreate(parent, context, false);
    TraceContext pendingContext = pendingSpan.context();
    if (pendingContext != null) context = pendingContext;
    //pendingSpan.state() -> MutableSpan  和  pendingSpans中携带的PendingSpan 是同一个对象
    return new RealSpan(context, pendingSpans, pendingSpan.state(), pendingSpan.clock());
}

TraceContext

链路追踪的上下文,携带了链路追踪定义的信息

代码语言:txt复制
//@Immutable    
public final class TraceContext extends SamplingFlags { 
    final long traceIdHigh, traceId, localRootId, parentId, spanId;
    final List<Object> extraList;
    TraceContext(
        int flags,
        long traceIdHigh,
        long traceId,
        long localRootId,
        long parentId,
        long spanId,
        //用于扩展时增加附加的信息
        List<Object> extraList
    ) {
        super(flags);
        this.traceIdHigh = traceIdHigh;
        this.traceId = traceId;
        this.localRootId = localRootId;
        this.parentId = parentId;
        this.spanId = spanId;
        this.extraList = extraList;
    }
}

CurrentTraceContext

这个和TraceContext没有继承关系, 是持有TraceContext的地方, 默认的实现类是: Default,通过本地线程变量来存储上下文

代码语言:txt复制
  public static final class Default extends ThreadLocalCurrentTraceContext {
    // Inheritable as Brave 3's ThreadLocalServerClientAndLocalSpanState was inheritable
    static final InheritableThreadLocal<TraceContext> INHERITABLE = new InheritableThreadLocal<>();
    super{  //父类
          static final ThreadLocal<TraceContext> DEFAULT = new ThreadLocal<>();
          final ThreadLocal<TraceContext> local;    
          final RevertToNullScope revertToNull;     //root - scope
         super{
              //Scope的装饰器
              final ScopeDecorator[] scopeDecorators;
         }
    }
  }

RealSpan

在看源码中Span一直有点梳理不清楚,因为代码中创建的span都是RealSpan, 所以依RealSpan为入口对Span进行了梳理,结论如下:

  • span中数据的存储在MutableSpan中; 如tag/annotate;
  • PendingSpans(继承了WeakConcurrentMap)中包含了所有挂起的MutableSpan,当我们执行start/end/finish/flush时, RealSpan是将命令转发到PendingSpans中进行执行; 这个类里面统一的进行了处理(如:将信息发送到zipkin服务器上)
  • annotate是标注的意思; 标注某一个事件("sr","cr","ss","cs")发生的时间点; 个人理解是一种特殊的tag(理解用的,实际上和tag的存储是分开的) 除此之外,Clock刚开始不知道干嘛用的; 在annotate中也得到了解惑; 就是获取时间点的,
代码语言:txt复制
final class RealSpan extends Span {
  final TraceContext context;
  final PendingSpans pendingSpans;
  final MutableSpan state;
  final Clock clock;

    /**
    * 个人理解annotate是一种特殊tag的来源方法
    */
  @Override public Span annotate(long timestamp, String value) {
    // Modern instrumentation should not send annotations such as this, but we leniently
    // accept them rather than fail. This for example allows old bridges like to Brave v3 to work
    if ("cs".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.CLIENT);
        state.startTimestamp(timestamp);
      }
    } else if ("sr".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.SERVER);
        state.startTimestamp(timestamp);
      }
    } else if ("cr".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.CLIENT);
      }
      finish(timestamp);
    } else if ("ss".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.SERVER);
      }
      finish(timestamp);
    } else {
      synchronized (state) {
        state.annotate(timestamp, value);
      }
    }
    return this;
  }   
}

PendingSpan

字面意思: 挂起的span, 比如start一个span时, PendingSpans中增加PendingSpan对象, 调用end/flush时,移出这个对象

  • 在PendingSpans中有个map是:TraceContext->PendingSpan ;
  • 在一个区域内; RealSpan中有个state对应该类的 span 是同一个对象
代码语言:txt复制
public final class PendingSpan extends WeakReference<TraceContext> {
  final MutableSpan span;
  final TickClock clock;
  final TraceContext handlerContext;
}

PendingSpans

前面我们看Tracer类的时候也有这个类; 首先Tracer是单例的; 那么单例里面有这么一个对象; 所以这个对象也是单例的; 我们对span执行start/end/flush/finish等操作实际上都是调入到这个类上, 这个类继承了WeakConcurrentMap类,里面有所有PendingSpan 类 比如finish方法会溢出PendingSpan ,然后通过spanHandler将数据提交给了zipkin服务器

代码语言:txt复制
public final class PendingSpans extends WeakConcurrentMap<TraceContext, PendingSpan> {
  final MutableSpan defaultSpan;
  final Clock clock;
  //brave.Tracing.Builder#build  这个spanHandler是 zipkinSpanReporter 专门用来提交数据的
  final SpanHandler spanHandler;
  final AtomicBoolean noop;
    
 public void finish(TraceContext context, long timestamp) {
    PendingSpan last = remove(context);
    if (last == null) return;
    last.span.finishTimestamp(timestamp != 0L ? timestamp : last.clock.currentTimeMicroseconds());
    spanHandler.end(last.handlerContext, last.span, Cause.FINISHED);
  }
}

MutableSpan

名称上说明该对象是可变的; 外部暴露的span为RealSpan,当调用tag/annotae方法时实际数据保存的地方就是该类

代码语言:txt复制
public final class MutableSpan implements Cloneable {
      /*
   * One of these objects is allocated for each in-flight span, so we try to be parsimonious on
   * things like array allocation and object reference size.
   */
  String traceId, localRootId, parentId, id;
  Kind kind;
  int flags;
  long startTimestamp, finishTimestamp;
  String name, localServiceName, localIp, remoteServiceName, remoteIp;
  int localPort, remotePort;
  Throwable error;

  //
  // The below use object arrays instead of ArrayList. The intent is not for safe sharing
  // (copy-on-write), as this type is externally synchronized. In other words, this isn't
  // copy-on-write. We just grow arrays as we need to similar to how ArrayList does it.
  //
  // tags [(key, value)] annotations [(timestamp, value)]
  Object[] tags = EMPTY_ARRAY, annotations = EMPTY_ARRAY;
  int tagCount, annotationCount;
}

ZipkinSpanHandler

对应前面的PendingSpans类,这个类里面Reporter见名知意是提交器,通过这个handler来将数据提交到zipkin服务器上

代码语言:txt复制
public class ZipkinSpanHandler extends SpanHandler implements Closeable { 
    final Reporter<MutableSpan> spanReporter;
    final Tag<Throwable> errorTag; // for toBuilder()
    final boolean alwaysReportSpans;
}

AsyncReporter

只会有一条线程进行发送; 如果线程已经在工作的话会丢入pending队列中之前的线程会处理掉

代码语言:txt复制
public abstract class AsyncReporter<S> extends Component implements Reporter<S>, Flushable {
    
  //通过内部的Build类可以得到,这个reporter就是该类,
  static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
    static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
     //线程是否已经在工作了;
    final AtomicBoolean started, closed;
    final BytesEncoder<S> encoder;
    //缓冲队列
    final ByteBoundedQueue<S> pending;
    final Sender sender;
    final int messageMaxBytes;
    final long messageTimeoutNanos, closeTimeoutNanos;
    final CountDownLatch close;
    final ReporterMetrics metrics;
    //生成线程的工厂类
    final ThreadFactory threadFactory;

    /** Tracks if we should log the first instance of an exception in flush(). */
    private boolean shouldWarnException = true;
      
    //特意粘贴这个方法,是想解释异步的原因在这个类里面,启动新线程处理
   void startFlusherThread() {
      BufferNextMessage<S> consumer =
          BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
      Thread flushThread = threadFactory.newThread(new Flusher<>(this, consumer));
      flushThread.setName("AsyncReporter{"   sender   "}");
      flushThread.setDaemon(true);
      flushThread.start();
    }
  }
}

Sender

发送器,这个类是将数据如何发送到zipkin服务的, rabbitmq/kafka/rest/okhttp等方式, 这个类比较好理解,不同实现也不粘代码了

0 人点赞