文章分三部分:
- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)
- zipkin-brave的demo及源码(https://cloud.tencent.com/developer/article/1884429)
- spring-cloud整合zipkin源码
序言
十一放个假,每天都在疯狂的补觉,现在上班还处于想睡觉的状态...哈哈
springcloud整合zipkin源码
本次看源码的目标
因为不同的交互方式无法统一一个方法来携带信息(如http/udp的不同,与redis/mq/rest交互时), https://github.com/openzipkin/brave/tree/master/brave是brave自己针对不同的方式进行的整合
本次源码主要是梳理springcloud中mvc请求/openfeign/rabbitmq/redis中的切入方式
自动装配入口
通过Maven Helper很容易的在spring-cloud-starter-sleuth中看到spring-cloud-sleuth-autoconfigure,一看就猜到这个八成就是自动装配的入口了,进去找下spring.factory文件,
代码语言:javascript复制# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncCustomAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncDefaultAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.circuitbreaker.TraceCircuitBreakerAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.rxjava.TraceRxJavaAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.quartz.TraceQuartzAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.feign.TraceFeignClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebAsyncClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.scheduling.TraceSchedulingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.reactor.TraceReactorAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceFunctionAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringIntegrationAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringMessagingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceWebSocketAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.web.client.BraveWebClientAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.rpc.BraveRpcAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.grpc.BraveGrpcAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveKafkaStreamsAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveMessagingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.opentracing.BraveOpentracingAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.redis.BraveRedisAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.brave.instrument.mongodb.BraveMongoDbAutoConfiguration,
org.springframework.cloud.sleuth.autoconfig.zipkin2.ZipkinAutoConfiguration
# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor,
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceGatewayEnvironmentPostProcessor
首次看可能会有些多; 但是我们在了解了brave源码后,我们就可以根据自己的感觉来挑选出是和我们的装配类
挑选阅读的配置类
首先可以随便找几个想看的配置类,先进行猜测一波,
- TraceWebAutoConfiguration/TraceWebClientAutoConfiguration/TraceFeignClientAutoConfiguration可能能看到web请求和feign请求的处理方式
- TraceWebSocketAutoConfiguration可能可以获取到websocket交互时的处理方式
- BraveRpcAutoConfiguration/BraveGrpcAutoConfiguration可能可以获取到rpc/grpc的处理方式
- BraveKafkaStreamsAutoConfiguration/BraveMessagingAutoConfiguration可能获取到jms的处理方式
- BraveRedisAutoConfiguration/BraveMongoDbAutoConfiguration可能获取到缓存服务的处理方式
- BraveAutoConfiguration可能获取到我们看brave源码中涉及的核心Tracer相关的信息
- ZipkinAutoConfiguration是用于装配与zipkin服务器进行交互的配置类
甭管对不对,看看也就知道了
日志调整:- TraceEnvironmentPostProcessor
在学习brave中,日志并没有打印出traceId和spanId, 因为我们根本没有规定日志打印的内容, 而在springboot的项目的日志中.并没有做什么事情,日志可以打印出这些信息,一定是在某一步对日志进行了调整; config类应该不会做; 而配置中下面的两个EnvironmentPostProcessor就可以做这个事情;
代码语言:javascript复制 @Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Map<String, Object> map = new HashMap<String, Object>();
// This doesn't work with all logging systems but it's a useful default so you see
// traces in logs without having to configure it.
if (Boolean.parseBoolean(environment.getProperty("spring.sleuth.enabled", "true"))) {
map.put("logging.pattern.level",
"%5p [${spring.zipkin.service.name:" "${spring.application.name:}},%X{traceId:-},%X{spanId:-}]");
}
addOrReplace(environment.getPropertySources(), map);
}
BraveAutoConfiguration
这个类确实和猜测的一样,创建Tracing/Tracer/CurrentTraceContext等
ZipkinAutoConfiguration
内部定义了Reporter,是否注册到注册中心中去,是否统计计量等; 这点和预期一致,没有细看; 因为我是梳理在项目中使用到链路追踪的流程
TraceWebClientAutoConfiguration
内部包含了RestTemplateConfig/HttpHeadersFilterConfig/NettyConfiguration/WebClientConfig/TraceOAuthConfiguration几个配置 通过类名及内部的bean可以得知,这是处理web请求相关的; RestTemplate是我常用的类; 那么就那这个类的代码进行阅读下
TracingClientHttpRequestInterceptor
RestTemplateConfig内的bean; 用的是典型的拦截器模式; 在execution.execute方法前后进行相应的逻辑 ; handleSend从request中获取传入的traceId和spanId; 然后通过tracer调用nextSpan来创建属于当前的span handleReceive则是调用span.finish()或者error()方法进行收尾 我们在brave的demo中代码虽然; 但是demo写完看这套代码轻松很多
代码语言:javascript复制public final class TracingClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest req, byte[] body, ClientHttpRequestExecution execution)
throws IOException {
HttpRequestWrapper request = new HttpRequestWrapper(req);
Span span = handler.handleSend(request);
if (log.isDebugEnabled()) {
log.debug("Wrapping an outbound http call with span [" span "]");
}
ClientHttpResponse response = null;
Throwable error = null;
try (CurrentTraceContext.Scope ws = currentTraceContext.newScope(span.context())) {
response = execution.execute(req, body);
return response;
}
catch (Throwable e) {
error = e;
throw e;
}
finally {
handler.handleReceive(new ClientHttpResponseWrapper(request, response, error), span);
}
}
}
TraceFeignClientAutoConfiguration
见名知意, 这个类就是处理feign的, 先对结论进行描述: feign进行交互时抽象了一个Client,比如默认的Default,okhttp的OkHttpClient和apache的ApacheHttpClient;通过对这个client包装了一层实现的
FeignContextBeanPostProcessor
FeignBeanPostProcessorConfiguration配置下的bean;强行将所有FeignContext进行了封装,而所有Feign的创建都需要从FeignContext中获取组件 通过代码可以看到核心的关键还是TraceFeignObjectWrapper类;
代码语言:javascript复制关于FeignContext这个类,简单的描述下,能帮忙理解的话最好,如果让你更迷惑的话,就无视这一段描述 FeignContext: Feign应用上下文, 所有装配的
组件
都是从该上下文中获取的; Feign里面有一段很偷懒的代码; 个人的简单理解是,是每一个FeignClient都可能有不同的配置,比如自定义Decoder/Contract等, springcloudopenfeign是直接为每一个FeignClient都搞一个应用上下文;这个上下文的是应用上下文的子上下文, 然后因为每一个Feign都对应了一个属于自己的应用上下文,如果有自定义组件就用自己的; 没有就用父上下文(会有默认的)组件
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof FeignContext && !(bean instanceof TraceFeignContext)) {
return new TraceFeignContext(traceFeignObjectWrapper(), (FeignContext) bean);
}
return bean;
}
private TraceFeignObjectWrapper traceFeignObjectWrapper() {
return new TraceFeignObjectWrapper(this.beanFactory);
}
TraceFeignObjectWrapper
这个类就是将原本FeignContext返回的,经过一个warp方法进行包装; 代码只是对client进行包装, 我们深入到client的实现类看下
代码语言:javascript复制 Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof TracingFeignClient)
&& !(bean instanceof LazyTracingFeignClient)) {
if (loadBalancerPresent && bean instanceof FeignBlockingLoadBalancerClient
&& !(bean instanceof TraceFeignBlockingLoadBalancerClient)) {
return instrumentedFeignLoadBalancerClient(bean);
}
if (loadBalancerPresent && bean instanceof RetryableFeignBlockingLoadBalancerClient
&& !(bean instanceof TraceRetryableFeignBlockingLoadBalancerClient)) {
return instrumentedRetryableFeignLoadBalancerClient(bean);
}
return new LazyTracingFeignClient(this.beanFactory, (Client) bean);
}
return this.traceFeignBuilderBeanPostProcessor.postProcessAfterInitialization(bean, null);
}
TracingFeignClient
直接贴下核心代码; 逻辑和web的那段是一样的方式
代码语言:javascript复制 @Override
public Response execute(Request req, Request.Options options) throws IOException {
RequestWrapper request = new RequestWrapper(req);
Span span = this.handler.handleSend(request);
if (log.isDebugEnabled()) {
log.debug("Handled send of " span);
}
Response res = null;
Throwable error = null;
try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(span.context())) {
res = this.delegate.execute(request.build(), options);
if (res == null) { // possibly null on bad implementation or mocks
res = Response.builder().request(req).build();
}
return res;
}
catch (Throwable e) {
error = e;
throw e;
}
finally {
ResponseWrapper response = new ResponseWrapper(request, res, error);
this.handler.handleReceive(response, span);
if (log.isDebugEnabled()) {
log.debug("Handled receive of " span);
}
}
}
BraveRedisAutoConfiguration
明显是redis的; 里面只有针对Lettuce进行后置处理的逻辑,所以可以预判,如果项目的版本太低,比如springboot1.5.4, 那么与redis交互时下面代码应该是不会生效的
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "spring.sleuth.redis.enabled", matchIfMissing = true)
@ConditionalOnBean({ Tracing.class, ClientResources.class })
@AutoConfigureAfter({ BraveAutoConfiguration.class })
@EnableConfigurationProperties(TraceRedisProperties.class)
@ConditionalOnClass(BraveTracing.class)
public class BraveRedisAutoConfiguration {
@Bean
static TraceLettuceClientResourcesBeanPostProcessor traceLettuceClientResourcesBeanPostProcessor(
BeanFactory beanFactory) {
return new TraceLettuceClientResourcesBeanPostProcessor(beanFactory);
}
}
TraceLettuceClientResourcesBeanPostProcessor
关于对redis的处理; lettuce的客户端本身就有tracing功能; 所以这里是进行了整合, 因为我还没有看过lettuce的源码,对这个不熟,不多说
代码语言:javascript复制public class TraceLettuceClientResourcesBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof ClientResources) {
ClientResources cr = (ClientResources) bean;
if (!cr.tracing().isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Lettuce ClientResources bean is auto-configured to enable tracing.");
}
return cr.mutate().tracing(new LazyTracing(this.beanFactory)).build();
}
else if (log.isDebugEnabled()) {
log.debug(
"Lettuce ClientResources bean is skipped for auto-configuration because tracing was already enabled.");
}
}
return bean;
}
}
class LazyTracing implements io.lettuce.core.tracing.Tracing {
private final BeanFactory beanFactory;
private final io.lettuce.core.tracing.Tracing noOpTracing = NoOpTracing.INSTANCE;
private BraveTracing braveTracing;
LazyTracing(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
//...
}
BraveMessagingAutoConfiguration
看下springRabbit的实现方式; SleuthRabbitBeanPostProcessor这个bean上面标注for tests; 所以核心逻辑应为SpringRabbitTracing, 不幸的是我先通过SleuthRabbitBeanPostProcessor类看实现了,好在这样子让我更快的看出实现方式
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(MessagingTracing.class)
@ConditionalOnMessagingEnabled
@ConditionalOnBean(Tracing.class)
@EnableConfigurationProperties(SleuthMessagingProperties.class)
public class BraveMessagingAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
@ConditionalOnClass(RabbitTemplate.class)
protected static class SleuthRabbitConfiguration {
@Bean
// for tests
@ConditionalOnMissingBean
static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
return new SleuthRabbitBeanPostProcessor(beanFactory);
}
@Bean
@ConditionalOnMissingBean
SpringRabbitTracing springRabbitTracing(MessagingTracing messagingTracing,
SleuthMessagingProperties properties) {
return SpringRabbitTracing.newBuilder(messagingTracing)
.remoteServiceName(properties.getRabbit().getRemoteServiceName()).build();
}
}
}
SpringRabbitTracing
通过包装MessagePostProcessor在header中增加需要传递的信息,看下TracingMessagePostProcessor 通过SimpleRabbitListenerContainerFactory中增加Advice来实现aop拦截, 所以需要跟踪下TracingRabbitListenerAdvice
代码语言:javascript复制public final class SpringRabbitTracing {
/** Creates an instrumented {@linkplain RabbitTemplate} */
public RabbitTemplate newRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
TracingMessagePostProcessor tracingMessagePostProcessor = new TracingMessagePostProcessor(this);
rabbitTemplate.setBeforePublishPostProcessors(tracingMessagePostProcessor);
return rabbitTemplate;
}
/** Instruments an existing {@linkplain RabbitTemplate} */
public RabbitTemplate decorateRabbitTemplate(RabbitTemplate rabbitTemplate) {
MessagePostProcessor[] beforePublishPostProcessors =
appendTracingMessagePostProcessor(rabbitTemplate, beforePublishPostProcessorsField);
if (beforePublishPostProcessors != null) {
rabbitTemplate.setBeforePublishPostProcessors(beforePublishPostProcessors);
}
return rabbitTemplate;
}
/** Creates an instrumented {@linkplain SimpleRabbitListenerContainerFactory} */
public SimpleRabbitListenerContainerFactory newSimpleRabbitListenerContainerFactory(
ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(new TracingRabbitListenerAdvice(this));
factory.setBeforeSendReplyPostProcessors(new TracingMessagePostProcessor(this));
return factory;
}
/** Instruments an existing {@linkplain SimpleRabbitListenerContainerFactory} */
public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactory factory
) {
Advice[] advice = prependTracingRabbitListenerAdvice(factory);
if (advice != null) factory.setAdviceChain(advice);
MessagePostProcessor[] beforeSendReplyPostProcessors =
appendTracingMessagePostProcessor(factory, beforeSendReplyPostProcessorsField);
if (beforeSendReplyPostProcessors != null) {
factory.setBeforeSendReplyPostProcessors(beforeSendReplyPostProcessors);
}
return factory;
}
}
TracingMessagePostProcessor
代码基本逻辑: 获取span, 通过injector注入到request中; 这个主要是处理发送端的,span.start(timestamp).finish(timestamp)
这里说明发送mq只是记录了下; 事实上发送和完成一块操作了
final class TracingMessagePostProcessor implements MessagePostProcessor {
@Override public Message postProcessMessage(Message message) {
MessageProducerRequest request = new MessageProducerRequest(message);
TraceContext maybeParent = currentTraceContext.get();
// Unlike message consumers, we try current span before trying extraction. This is the proper
// order because the span in scope should take precedence over a potentially stale header entry.
//
// NOTE: Brave instrumentation used properly does not result in stale header entries, as we
// always clear message headers after reading.
Span span;
if (maybeParent == null) {
TraceContextOrSamplingFlags extracted =
springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
span = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
} else { // If we have a span in scope assume headers were cleared before
span = tracer.newChild(maybeParent);
}
if (!span.isNoop()) {
span.kind(PRODUCER).name("publish");
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
// incur timestamp overhead only once
long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
span.start(timestamp).finish(timestamp);
}
injector.inject(span.context(), request);
return message;
}
}
TracingRabbitListenerAdvice
这个类主要应对消费端; 这个类是我之前想找没找到的
代码语言:javascript复制之前有需要对mq消费前后都进行处理的需求,当时debug没有发现合适的扩展点,所以最后是自己写了一个aop切所有consumer来实现, 看这里代码后发现consumer有相应的扩展
final class TracingRabbitListenerAdvice implements MethodInterceptor {
/**
* MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
* Message)}
*/
@Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
Message message = null;
if (methodInvocation.getArguments()[1] instanceof List) {
message = ((List<? extends Message>) methodInvocation.getArguments()[1]).get(0);
} else {
message = (Message) methodInvocation.getArguments()[1];
}
MessageConsumerRequest request = new MessageConsumerRequest(message);
TraceContextOrSamplingFlags extracted =
springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
// named for BlockingQueueConsumer.nextMessage, which we can't currently see
Span consumerSpan = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());
if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getMessageProperties());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}
Tracer.SpanInScope ws = tracer.withSpanInScope(listenerSpan);
Throwable error = null;
try {
return methodInvocation.proceed();
} catch (Throwable t) {
error = t;
throw t;
} finally {
if (error != null) listenerSpan.error(error);
listenerSpan.finish();
ws.close();
}
}
}