Spring Cloud托管的线程池实例会自动封装为带链路信息的线程池,防止链路信息丢失
博文
使用链路包装的线程池,防止链路信息丢失
介绍了线程池环境下如何避免链路信息丢失。
我们今天介绍,Spring Cloud容器托管的线程池实例,是如何自动链路包装的。
ExecutorBeanPostProcessor代理原线程池
ExecutorBeanPostProcessor的处理流程:
1、ExecutorBeanPostProcessor实现BeanPostProcessor
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(SleuthAsyncProperties.class)
@ConditionalOnProperty(value = "spring.sleuth.async.enabled", matchIfMissing = true)
@ConditionalOnBean(Tracer.class)
@AutoConfigureAfter(BraveAutoConfiguration.class)
public class TraceAsyncDefaultAutoConfiguration {
@Bean
@ConditionalOnProperty(value = "spring.sleuth.scheduled.enabled", matchIfMissing = true)
static ExecutorBeanPostProcessor executorBeanPostProcessor(BeanFactory beanFactory) {
return new ExecutorBeanPostProcessor(beanFactory);
}
}
2、postProcessAfterInitialization方法判断当前bean实例是否
是线程池Executor,并且非链路包装过的线程池时,创建链路线程池代理
代码语言:javascript复制/**
* Bean post processor that wraps a call to an {@link Executor} either in a JDK or CGLIB
* proxy. Depending on whether the implementation has a final method or is final.
*
* @author Marcin Grzejszczak
* @author Jesus Alonso
* @author Denys Ivano
* @author Vladislav Fefelov
* @since 1.1.4
*/
public class ExecutorBeanPostProcessor implements BeanPostProcessor {
private static final Log log = LogFactory.getLog(ExecutorBeanPostProcessor.class);
private final BeanFactory beanFactory;
private SleuthAsyncProperties sleuthAsyncProperties;
public ExecutorBeanPostProcessor(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!ExecutorInstrumentor.isApplicableForInstrumentation(bean)) {
return bean;
}
return new ExecutorInstrumentor(() -> sleuthAsyncProperties().getIgnoredBeans(), this.beanFactory)
.instrument(bean, beanName);
}
private SleuthAsyncProperties sleuthAsyncProperties() {
if (this.sleuthAsyncProperties == null) {
this.sleuthAsyncProperties = this.beanFactory.getBean(SleuthAsyncProperties.class);
}
return this.sleuthAsyncProperties;
}
}
第30行判断线程池是否需要链路包装:必须是非链路包装过的线程池。
代码语言:javascript复制org.springframework.cloud.sleuth.instrument.async.ExecutorInstrumentor#isApplicableForInstrumentation
代码语言:javascript复制public static boolean isApplicableForInstrumentation(Object bean) {
return bean instanceof Executor && !(bean instanceof LazyTraceThreadPoolTaskExecutor
|| bean instanceof TraceableScheduledExecutorService || bean instanceof TraceableExecutorService
|| bean instanceof LazyTraceAsyncTaskExecutor || bean instanceof LazyTraceExecutor);
}
3、创建链路线程池代理对象
代码语言:javascript复制org.springframework.cloud.sleuth.instrument.async.ExecutorInstrumentor#instrument
代码语言:javascript复制/**
* Wraps an {@link Executor} bean in its trace representation.
* @param bean a bean (might be of {@link Executor} type
* @param beanName name of the bean
* @return wrapped bean or just bean if not {@link Executor} or already instrumented
*/
public Object instrument(Object bean, String beanName) {
if (!isApplicableForInstrumentation(bean)) {
log.info("Bean is already instrumented or is not applicable for instrumentation " beanName);
return bean;
}
if (bean instanceof ThreadPoolTaskExecutor) {
if (isProxyNeeded(beanName)) {
return wrapThreadPoolTaskExecutor(bean, beanName);
}
else {
log.info("Not instrumenting bean " beanName);
}
}
else if (bean instanceof ScheduledExecutorService) {
if (isProxyNeeded(beanName)) {
return wrapScheduledExecutorService(bean, beanName);
}
else {
log.info("Not instrumenting bean " beanName);
}
}
else if (bean instanceof ExecutorService) {
if (isProxyNeeded(beanName)) {
return wrapExecutorService(bean, beanName);
}
else {
log.info("Not instrumenting bean " beanName);
}
}
else if (bean instanceof AsyncTaskExecutor) {
if (isProxyNeeded(beanName)) {
return wrapAsyncTaskExecutor(bean, beanName);
}
else {
log.info("Not instrumenting bean " beanName);
}
}
else if (bean instanceof Executor) {
return wrapExecutor(bean, beanName);
}
return bean;
}
private Object wrapExecutor(Object bean, String beanName) {
Executor executor = (Executor) bean;
boolean methodFinal = anyFinalMethods(executor);
boolean classFinal = Modifier.isFinal(bean.getClass().getModifiers());
boolean cglibProxy = !methodFinal && !classFinal;
try {
return createProxy(bean, cglibProxy, new ExecutorMethodInterceptor<>(executor, this.beanFactory, beanName));
}
catch (AopConfigException ex) {
if (cglibProxy) {
if (log.isDebugEnabled()) {
log.debug("Exception occurred while trying to create a proxy, falling back to JDK proxy", ex);
}
return createProxy(bean, false, new ExecutorMethodInterceptor<>(executor, this.beanFactory, beanName));
}
throw ex;
}
}
根据不同的具体线程池类别,创建代理对象。如果原线程池类是final的,只能基于字节码方式创建代理了。
博文源码来自:
代码语言:javascript复制spring-cloud-sleuth-autoconfigure
版本:<version>3.1.1</version>
小结
Spring Cloud托管的线程池实例,已被自动化代码封装为了链路线程池。如果是我们自己new的线程池实例,非@Bean方式托或非托管给容器等情况,需要我们手动封装返回带链路信息的线程池实例。