Spring Cloud托管的线程池实例,如何是自动封装为带链路信息的线程池,防止链路信息丢失

2023-06-19 15:31:31 浏览数 (2)

Spring Cloud托管的线程池实例会自动封装为带链路信息的线程池,防止链路信息丢失


博文

使用链路包装的线程池,防止链路信息丢失

介绍了线程池环境下如何避免链路信息丢失。

我们今天介绍,Spring Cloud容器托管的线程池实例,是如何自动链路包装的。

ExecutorBeanPostProcessor代理原线程池


ExecutorBeanPostProcessor的处理流程:

1、ExecutorBeanPostProcessor实现BeanPostProcessor

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(SleuthAsyncProperties.class)
@ConditionalOnProperty(value = "spring.sleuth.async.enabled", matchIfMissing = true)
@ConditionalOnBean(Tracer.class)
@AutoConfigureAfter(BraveAutoConfiguration.class)
public class TraceAsyncDefaultAutoConfiguration {

  @Bean
  @ConditionalOnProperty(value = "spring.sleuth.scheduled.enabled", matchIfMissing = true)
  static ExecutorBeanPostProcessor executorBeanPostProcessor(BeanFactory beanFactory) {
    return new ExecutorBeanPostProcessor(beanFactory);
  }
}

2、postProcessAfterInitialization方法判断当前bean实例是否

是线程池Executor,并且非链路包装过的线程池时,创建链路线程池代理

代码语言:javascript复制
/**
 * Bean post processor that wraps a call to an {@link Executor} either in a JDK or CGLIB
 * proxy. Depending on whether the implementation has a final method or is final.
 *
 * @author Marcin Grzejszczak
 * @author Jesus Alonso
 * @author Denys Ivano
 * @author Vladislav Fefelov
 * @since 1.1.4
 */
public class ExecutorBeanPostProcessor implements BeanPostProcessor {

  private static final Log log = LogFactory.getLog(ExecutorBeanPostProcessor.class);

  private final BeanFactory beanFactory;

  private SleuthAsyncProperties sleuthAsyncProperties;

  public ExecutorBeanPostProcessor(BeanFactory beanFactory) {
    this.beanFactory = beanFactory;
  }

  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (!ExecutorInstrumentor.isApplicableForInstrumentation(bean)) {
      return bean;
    }
    return new ExecutorInstrumentor(() -> sleuthAsyncProperties().getIgnoredBeans(), this.beanFactory)
        .instrument(bean, beanName);
  }

  private SleuthAsyncProperties sleuthAsyncProperties() {
    if (this.sleuthAsyncProperties == null) {
      this.sleuthAsyncProperties = this.beanFactory.getBean(SleuthAsyncProperties.class);
    }
    return this.sleuthAsyncProperties;
  }

}

第30行判断线程池是否需要链路包装:必须是非链路包装过的线程池。

代码语言:javascript复制
org.springframework.cloud.sleuth.instrument.async.ExecutorInstrumentor#isApplicableForInstrumentation
代码语言:javascript复制
public static boolean isApplicableForInstrumentation(Object bean) {
    return bean instanceof Executor && !(bean instanceof LazyTraceThreadPoolTaskExecutor
        || bean instanceof TraceableScheduledExecutorService || bean instanceof TraceableExecutorService
        || bean instanceof LazyTraceAsyncTaskExecutor || bean instanceof LazyTraceExecutor);
  }

3、创建链路线程池代理对象

代码语言:javascript复制
org.springframework.cloud.sleuth.instrument.async.ExecutorInstrumentor#instrument
代码语言:javascript复制
/**
   * Wraps an {@link Executor} bean in its trace representation.
   * @param bean a bean (might be of {@link Executor} type
   * @param beanName name of the bean
   * @return wrapped bean or just bean if not {@link Executor} or already instrumented
   */
  public Object instrument(Object bean, String beanName) {
    if (!isApplicableForInstrumentation(bean)) {
      log.info("Bean is already instrumented or is not applicable for instrumentation "   beanName);
      return bean;
    }
    if (bean instanceof ThreadPoolTaskExecutor) {
      if (isProxyNeeded(beanName)) {
        return wrapThreadPoolTaskExecutor(bean, beanName);
      }
      else {
        log.info("Not instrumenting bean "   beanName);
      }
    }
    else if (bean instanceof ScheduledExecutorService) {
      if (isProxyNeeded(beanName)) {
        return wrapScheduledExecutorService(bean, beanName);
      }
      else {
        log.info("Not instrumenting bean "   beanName);
      }
    }
    else if (bean instanceof ExecutorService) {
      if (isProxyNeeded(beanName)) {
        return wrapExecutorService(bean, beanName);
      }
      else {
        log.info("Not instrumenting bean "   beanName);
      }
    }
    else if (bean instanceof AsyncTaskExecutor) {
      if (isProxyNeeded(beanName)) {
        return wrapAsyncTaskExecutor(bean, beanName);
      }
      else {
        log.info("Not instrumenting bean "   beanName);
      }
    }
    else if (bean instanceof Executor) {
      return wrapExecutor(bean, beanName);
    }
    return bean;
  }

  private Object wrapExecutor(Object bean, String beanName) {
    Executor executor = (Executor) bean;
    boolean methodFinal = anyFinalMethods(executor);
    boolean classFinal = Modifier.isFinal(bean.getClass().getModifiers());
    boolean cglibProxy = !methodFinal && !classFinal;
    try {
      return createProxy(bean, cglibProxy, new ExecutorMethodInterceptor<>(executor, this.beanFactory, beanName));
    }
    catch (AopConfigException ex) {
      if (cglibProxy) {
        if (log.isDebugEnabled()) {
          log.debug("Exception occurred while trying to create a proxy, falling back to JDK proxy", ex);
        }
        return createProxy(bean, false, new ExecutorMethodInterceptor<>(executor, this.beanFactory, beanName));
      }
      throw ex;
    }
  }

根据不同的具体线程池类别,创建代理对象。如果原线程池类是final的,只能基于字节码方式创建代理了。

博文源码来自:

代码语言:javascript复制
spring-cloud-sleuth-autoconfigure
版本:<version>3.1.1</version>

小结


Spring Cloud托管的线程池实例,已被自动化代码封装为了链路线程池。如果是我们自己new的线程池实例,非@Bean方式托或非托管给容器等情况,需要我们手动封装返回带链路信息的线程池实例。

0 人点赞