spring-cloud-sleuth源码学习三

2021-10-11 10:43:46 浏览数 (1)

文章分三部分:

- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)

- zipkin-brave的demo及源码(https://cloud.tencent.com/developer/article/1884429)

- spring-cloud整合zipkin源码

序言

十一放个假,每天都在疯狂的补觉,现在上班还处于想睡觉的状态...哈哈

springcloud整合zipkin源码

本次看源码的目标

因为不同的交互方式无法统一一个方法来携带信息(如http/udp的不同,与redis/mq/rest交互时), https://github.com/openzipkin/brave/tree/master/brave是brave自己针对不同的方式进行的整合

本次源码主要是梳理springcloud中mvc请求/openfeign/rabbitmq/redis中的切入方式

自动装配入口

通过Maven Helper很容易的在spring-cloud-starter-sleuth中看到spring-cloud-sleuth-autoconfigure,一看就猜到这个八成就是自动装配的入口了,进去找下spring.factory文件,

代码语言:javascript复制
# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncCustomAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncDefaultAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.circuitbreaker.TraceCircuitBreakerAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.rxjava.TraceRxJavaAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.quartz.TraceQuartzAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.feign.TraceFeignClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebAsyncClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.scheduling.TraceSchedulingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.reactor.TraceReactorAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceFunctionAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringIntegrationAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringMessagingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceWebSocketAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.web.client.BraveWebClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.rpc.BraveRpcAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.grpc.BraveGrpcAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveKafkaStreamsAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveMessagingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.opentracing.BraveOpentracingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.redis.BraveRedisAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.mongodb.BraveMongoDbAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.zipkin2.ZipkinAutoConfiguration
# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceGatewayEnvironmentPostProcessor

首次看可能会有些多; 但是我们在了解了brave源码后,我们就可以根据自己的感觉来挑选出是和我们的装配类

挑选阅读的配置类

首先可以随便找几个想看的配置类,先进行猜测一波,

  • TraceWebAutoConfiguration/TraceWebClientAutoConfiguration/TraceFeignClientAutoConfiguration可能能看到web请求和feign请求的处理方式
  • TraceWebSocketAutoConfiguration可能可以获取到websocket交互时的处理方式
  • BraveRpcAutoConfiguration/BraveGrpcAutoConfiguration可能可以获取到rpc/grpc的处理方式
  • BraveKafkaStreamsAutoConfiguration/BraveMessagingAutoConfiguration可能获取到jms的处理方式
  • BraveRedisAutoConfiguration/BraveMongoDbAutoConfiguration可能获取到缓存服务的处理方式
  • BraveAutoConfiguration可能获取到我们看brave源码中涉及的核心Tracer相关的信息
  • ZipkinAutoConfiguration是用于装配与zipkin服务器进行交互的配置类

甭管对不对,看看也就知道了

日志调整:- TraceEnvironmentPostProcessor

在学习brave中,日志并没有打印出traceId和spanId, 因为我们根本没有规定日志打印的内容, 而在springboot的项目的日志中.并没有做什么事情,日志可以打印出这些信息,一定是在某一步对日志进行了调整; config类应该不会做; 而配置中下面的两个EnvironmentPostProcessor就可以做这个事情;

代码语言:javascript复制
    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        Map<String, Object> map = new HashMap<String, Object>();
        // This doesn't work with all logging systems but it's a useful default so you see
        // traces in logs without having to configure it.
        if (Boolean.parseBoolean(environment.getProperty("spring.sleuth.enabled", "true"))) {
            map.put("logging.pattern.level",
                    "%5p [${spring.zipkin.service.name:"   "${spring.application.name:}},%X{traceId:-},%X{spanId:-}]");
        }
        addOrReplace(environment.getPropertySources(), map);
    }

BraveAutoConfiguration

这个类确实和猜测的一样,创建Tracing/Tracer/CurrentTraceContext等

ZipkinAutoConfiguration

内部定义了Reporter,是否注册到注册中心中去,是否统计计量等; 这点和预期一致,没有细看; 因为我是梳理在项目中使用到链路追踪的流程

TraceWebClientAutoConfiguration

内部包含了RestTemplateConfig/HttpHeadersFilterConfig/NettyConfiguration/WebClientConfig/TraceOAuthConfiguration几个配置 通过类名及内部的bean可以得知,这是处理web请求相关的; RestTemplate是我常用的类; 那么就那这个类的代码进行阅读下

TracingClientHttpRequestInterceptor

RestTemplateConfig内的bean; 用的是典型的拦截器模式; 在execution.execute方法前后进行相应的逻辑 ; handleSend从request中获取传入的traceId和spanId; 然后通过tracer调用nextSpan来创建属于当前的span handleReceive则是调用span.finish()或者error()方法进行收尾 我们在brave的demo中代码虽然; 但是demo写完看这套代码轻松很多

代码语言:javascript复制
public final class TracingClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
​
    @Override
    public ClientHttpResponse intercept(HttpRequest req, byte[] body, ClientHttpRequestExecution execution)
            throws IOException {
        HttpRequestWrapper request = new HttpRequestWrapper(req);
        Span span = handler.handleSend(request);
        if (log.isDebugEnabled()) {
            log.debug("Wrapping an outbound http call with span ["   span   "]");
        }
        ClientHttpResponse response = null;
        Throwable error = null;
        try (CurrentTraceContext.Scope ws = currentTraceContext.newScope(span.context())) {
            response = execution.execute(req, body);
            return response;
        }
        catch (Throwable e) {
            error = e;
            throw e;
        }
        finally {
            handler.handleReceive(new ClientHttpResponseWrapper(request, response, error), span);
        }
    }
}

TraceFeignClientAutoConfiguration

见名知意, 这个类就是处理feign的, 先对结论进行描述: feign进行交互时抽象了一个Client,比如默认的Default,okhttp的OkHttpClient和apache的ApacheHttpClient;通过对这个client包装了一层实现的

FeignContextBeanPostProcessor

FeignBeanPostProcessorConfiguration配置下的bean;强行将所有FeignContext进行了封装,而所有Feign的创建都需要从FeignContext中获取组件 通过代码可以看到核心的关键还是TraceFeignObjectWrapper类;

关于FeignContext这个类,简单的描述下,能帮忙理解的话最好,如果让你更迷惑的话,就无视这一段描述 FeignContext: Feign应用上下文, 所有装配的组件都是从该上下文中获取的; Feign里面有一段很偷懒的代码; 个人的简单理解是,是每一个FeignClient都可能有不同的配置,比如自定义Decoder/Contract等, springcloudopenfeign是直接为每一个FeignClient都搞一个应用上下文;这个上下文的是应用上下文的子上下文, 然后因为每一个Feign都对应了一个属于自己的应用上下文,如果有自定义组件就用自己的; 没有就用父上下文(会有默认的)组件

代码语言:javascript复制
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof FeignContext && !(bean instanceof TraceFeignContext)) {
            return new TraceFeignContext(traceFeignObjectWrapper(), (FeignContext) bean);
        }
        return bean;
    }
​
    private TraceFeignObjectWrapper traceFeignObjectWrapper() {
        return new TraceFeignObjectWrapper(this.beanFactory);
    }
​

TraceFeignObjectWrapper

这个类就是将原本FeignContext返回的,经过一个warp方法进行包装; 代码只是对client进行包装, 我们深入到client的实现类看下

代码语言:javascript复制
    Object wrap(Object bean) {
        if (bean instanceof Client && !(bean instanceof TracingFeignClient)
                && !(bean instanceof LazyTracingFeignClient)) {
            if (loadBalancerPresent && bean instanceof FeignBlockingLoadBalancerClient
                    && !(bean instanceof TraceFeignBlockingLoadBalancerClient)) {
                return instrumentedFeignLoadBalancerClient(bean);
            }
            if (loadBalancerPresent && bean instanceof RetryableFeignBlockingLoadBalancerClient
                    && !(bean instanceof TraceRetryableFeignBlockingLoadBalancerClient)) {
                return instrumentedRetryableFeignLoadBalancerClient(bean);
            }
            return new LazyTracingFeignClient(this.beanFactory, (Client) bean);
        }
        return this.traceFeignBuilderBeanPostProcessor.postProcessAfterInitialization(bean, null);
    }
​

TracingFeignClient

直接贴下核心代码; 逻辑和web的那段是一样的方式

代码语言:javascript复制
    @Override
    public Response execute(Request req, Request.Options options) throws IOException {
        RequestWrapper request = new RequestWrapper(req);
        Span span = this.handler.handleSend(request);
        if (log.isDebugEnabled()) {
            log.debug("Handled send of "   span);
        }
        Response res = null;
        Throwable error = null;
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(span.context())) {
            res = this.delegate.execute(request.build(), options);
            if (res == null) { // possibly null on bad implementation or mocks
                res = Response.builder().request(req).build();
            }
            return res;
        }
        catch (Throwable e) {
            error = e;
            throw e;
        }
        finally {
            ResponseWrapper response = new ResponseWrapper(request, res, error);
            this.handler.handleReceive(response, span);
​
            if (log.isDebugEnabled()) {
                log.debug("Handled receive of "   span);
            }
        }
    }

BraveRedisAutoConfiguration

明显是redis的; 里面只有针对Lettuce进行后置处理的逻辑,所以可以预判,如果项目的版本太低,比如springboot1.5.4, 那么与redis交互时下面代码应该是不会生效的

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "spring.sleuth.redis.enabled", matchIfMissing = true)
@ConditionalOnBean({ Tracing.class, ClientResources.class })
@AutoConfigureAfter({ BraveAutoConfiguration.class })
@EnableConfigurationProperties(TraceRedisProperties.class)
@ConditionalOnClass(BraveTracing.class)
public class BraveRedisAutoConfiguration {
    @Bean
    static TraceLettuceClientResourcesBeanPostProcessor traceLettuceClientResourcesBeanPostProcessor(
            BeanFactory beanFactory) {
        return new TraceLettuceClientResourcesBeanPostProcessor(beanFactory);
    }
}

TraceLettuceClientResourcesBeanPostProcessor

关于对redis的处理; lettuce的客户端本身就有tracing功能; 所以这里是进行了整合, 因为我还没有看过lettuce的源码,对这个不熟,不多说

代码语言:javascript复制
public class TraceLettuceClientResourcesBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof ClientResources) {
            ClientResources cr = (ClientResources) bean;
            if (!cr.tracing().isEnabled()) {
                if (log.isDebugEnabled()) {
                    log.debug("Lettuce ClientResources bean is auto-configured to enable tracing.");
                }
                return cr.mutate().tracing(new LazyTracing(this.beanFactory)).build();
            }
            else if (log.isDebugEnabled()) {
                log.debug(
                        "Lettuce ClientResources bean is skipped for auto-configuration because tracing was already enabled.");
            }
        }
        return bean;
    }
}
​
class LazyTracing implements io.lettuce.core.tracing.Tracing {
​
    private final BeanFactory beanFactory;
​
    private final io.lettuce.core.tracing.Tracing noOpTracing = NoOpTracing.INSTANCE;
​
    private BraveTracing braveTracing;
​
    LazyTracing(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }
    //...
​
}

BraveMessagingAutoConfiguration

看下springRabbit的实现方式; SleuthRabbitBeanPostProcessor这个bean上面标注for tests; 所以核心逻辑应为SpringRabbitTracing, 不幸的是我先通过SleuthRabbitBeanPostProcessor类看实现了,好在这样子让我更快的看出实现方式

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(MessagingTracing.class)
@ConditionalOnMessagingEnabled
@ConditionalOnBean(Tracing.class)
@EnableConfigurationProperties(SleuthMessagingProperties.class)
public class BraveMessagingAutoConfiguration {
​
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
    @ConditionalOnClass(RabbitTemplate.class)
    protected static class SleuthRabbitConfiguration {
​
        @Bean
        // for tests
        @ConditionalOnMissingBean
        static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
            return new SleuthRabbitBeanPostProcessor(beanFactory);
        }
​
        @Bean
        @ConditionalOnMissingBean
        SpringRabbitTracing springRabbitTracing(MessagingTracing messagingTracing,
                SleuthMessagingProperties properties) {
            return SpringRabbitTracing.newBuilder(messagingTracing)
                    .remoteServiceName(properties.getRabbit().getRemoteServiceName()).build();
        }
    }
}

SpringRabbitTracing

通过包装MessagePostProcessor在header中增加需要传递的信息,看下TracingMessagePostProcessor 通过SimpleRabbitListenerContainerFactory中增加Advice来实现aop拦截, 所以需要跟踪下TracingRabbitListenerAdvice

代码语言:javascript复制
public final class SpringRabbitTracing {
    
  /** Creates an instrumented {@linkplain RabbitTemplate} */
  public RabbitTemplate newRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    TracingMessagePostProcessor tracingMessagePostProcessor = new TracingMessagePostProcessor(this);
    rabbitTemplate.setBeforePublishPostProcessors(tracingMessagePostProcessor);
    return rabbitTemplate;
  }
​
  /** Instruments an existing {@linkplain RabbitTemplate} */
  public RabbitTemplate decorateRabbitTemplate(RabbitTemplate rabbitTemplate) {
    MessagePostProcessor[] beforePublishPostProcessors =
      appendTracingMessagePostProcessor(rabbitTemplate, beforePublishPostProcessorsField);
    if (beforePublishPostProcessors != null) {
      rabbitTemplate.setBeforePublishPostProcessors(beforePublishPostProcessors);
    }
    return rabbitTemplate;
  }
​
  /** Creates an instrumented {@linkplain SimpleRabbitListenerContainerFactory} */
  public SimpleRabbitListenerContainerFactory newSimpleRabbitListenerContainerFactory(
    ConnectionFactory connectionFactory
  ) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(new TracingRabbitListenerAdvice(this));
    factory.setBeforeSendReplyPostProcessors(new TracingMessagePostProcessor(this));
    return factory;
  }
​
  /** Instruments an existing {@linkplain SimpleRabbitListenerContainerFactory} */
  public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContainerFactory(
    SimpleRabbitListenerContainerFactory factory
  ) {
    Advice[] advice = prependTracingRabbitListenerAdvice(factory);
    if (advice != null) factory.setAdviceChain(advice);
​
    MessagePostProcessor[] beforeSendReplyPostProcessors =
      appendTracingMessagePostProcessor(factory, beforeSendReplyPostProcessorsField);
    if (beforeSendReplyPostProcessors != null) {
      factory.setBeforeSendReplyPostProcessors(beforeSendReplyPostProcessors);
    }
​
    return factory;
  }
}

TracingMessagePostProcessor

代码基本逻辑: 获取span, 通过injector注入到request中; 这个主要是处理发送端的,span.start(timestamp).finish(timestamp)这里说明发送mq只是记录了下; 事实上发送和完成一块操作了

代码语言:javascript复制
final class TracingMessagePostProcessor implements MessagePostProcessor {
​
  @Override public Message postProcessMessage(Message message) {
    MessageProducerRequest request = new MessageProducerRequest(message);
​
    TraceContext maybeParent = currentTraceContext.get();
    // Unlike message consumers, we try current span before trying extraction. This is the proper
    // order because the span in scope should take precedence over a potentially stale header entry.
    //
    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
    // always clear message headers after reading.
    Span span;
    if (maybeParent == null) {
      TraceContextOrSamplingFlags extracted =
        springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
      span = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
    } else { // If we have a span in scope assume headers were cleared before
      span = tracer.newChild(maybeParent);
    }
​
    if (!span.isNoop()) {
      span.kind(PRODUCER).name("publish");
      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
      // incur timestamp overhead only once
      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
      span.start(timestamp).finish(timestamp);
    }
​
    injector.inject(span.context(), request);
    return message;
  }
}

TracingRabbitListenerAdvice

这个类主要应对消费端; 这个类是我之前想找没找到的

之前有需要对mq消费前后都进行处理的需求,当时debug没有发现合适的扩展点,所以最后是自己写了一个aop切所有consumer来实现, 看这里代码后发现consumer有相应的扩展

代码语言:javascript复制
final class TracingRabbitListenerAdvice implements MethodInterceptor {
​
  /**
   * MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
   * Message)}
   */
  @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
    Message message = null;
    if (methodInvocation.getArguments()[1] instanceof List) {
      message = ((List<? extends Message>) methodInvocation.getArguments()[1]).get(0);
    } else {
      message = (Message) methodInvocation.getArguments()[1];
    }
    MessageConsumerRequest request = new MessageConsumerRequest(message);
​
    TraceContextOrSamplingFlags extracted =
      springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
​
    // named for BlockingQueueConsumer.nextMessage, which we can't currently see
    Span consumerSpan = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
    Span listenerSpan = tracer.newChild(consumerSpan.context());
​
    if (!consumerSpan.isNoop()) {
      setConsumerSpan(consumerSpan, message.getMessageProperties());
​
      // incur timestamp overhead only once
      long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
      consumerSpan.start(timestamp);
      long consumerFinish = timestamp   1L; // save a clock reading
      consumerSpan.finish(consumerFinish);
​
      // not using scoped span as we want to start with a pre-configured time
      listenerSpan.name("on-message").start(consumerFinish);
    }
​
    Tracer.SpanInScope ws = tracer.withSpanInScope(listenerSpan);
    Throwable error = null;
    try {
      return methodInvocation.proceed();
    } catch (Throwable t) {
      error = t;
      throw t;
    } finally {
      if (error != null) listenerSpan.error(error);
      listenerSpan.finish();
      ws.close();
    }
  }
}

0 人点赞