dynamic-tp动态线程池

2023-02-28 13:29:03 浏览数 (2)

一、如果要设计一个动态线程池,如何实现?

1)如果要实现一个动态线程池,首先需要考虑的是将线程池的相关配置信息外置。这样出现问题的时候,能够基于配置修改,实现热部署。修改配置后,就能生效。因此,可以考虑的配置方式有多种:nacos、apollo、zookeeper、consul、etcd等。

2)如果线程池出现问题或者完成修改后,能够基于监控的信息,进行通知和告警。这样就需要考虑通知和告警的方式的多样性:比如基于钉钉、微信、飞书、电子邮件等渠道进行通知和告警。

二、 dynamic-tp动态线程池的思想思路

1.事件发布

根据引入的dynamic-tp-spring-cloud-starter-nacos或者dynamic-tp-spring-boot-starter-nacos依赖

以nacos为例:

代码语言:javascript复制
 <dependency>
        <groupId>cn.dynamictp</groupId>
        <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
        <version>1.0.9</version>
    </dependency>

可以看到自动装配的文件:DtpAutoConfiguration与NacosRefresher.

在DtpAutoConfiguration中,我们可以看到导入的配置信息:

代码语言:javascript复制
@ImportAutoConfiguration({BaseBeanAutoConfiguration.class})

基于这个注解,关注BaseBeanAutoConfiguration这个类:

这个类主要干了下面这几件事请

代码语言:javascript复制
DtpProperties 线程池相关配置
上下文holder  dtpApplicationContextHolder
dtpBanner打印 dtpBannerPrinter 
dtp后置处理器    dtpPostProcessor
dtp注册   dtpRegistry
dtp监控    dtpMonitor
dtpEndpoint  dtpEndpoint

其中

1)dtpBanner是做控制台启动项目时的banner打印操作。

2)dtpPostProcessor dtp后置处理器 处理所有相关bean

如果bean是执行器,则注册dtp,此时会注册到DTP_REGISTRY 中, 数据结构:Map

否则会基于ApplicationHolder拿到基于DynamicTp注解的class,如果当前基于DynamicTp的methodMetadata 为空,则返回bean,否则拿到dtpAnnotationVal。poolName也即dtpAnnotationVal。

如果当前的bean属于线程池任务执行器,则注册task执行器。包装执行器,放入通知信息notifyItems。registerCommon 执行注册。

3)dtpRegistry 注册dtp DTP_REGISTRY 数据结构:Map

其中最为重要的方法是刷新方法:

获取dtp执行器,对执行器进行转换为DtpMainProp。执行刷新。

4)dtp监控 dtpMonitor会执行监控发布:里面有有2个方法需要注意:

检查监控 checkAlarm

collect 收集监控指标信息

其中:检查监控的时候,会基于当前的发送告警的信息:基于对应的渠道进行消息发送。

此时会发布两个事件:publishAlarmCheckEvent、publishCollectEvent

NacosRefresher中存在的方法Refresh: 刷新当监听到配置发生改变的时候,doNoticeAsync 执行异步通知,通知业务方,此时发生了配置的变更。

刷新完成后,执行RefreshEvent刷新事件发布。

2.事件监听

发布完成后,可以看到对应的监听是在

代码语言:javascript复制
com.dtp.starter.adapter.common.autoconfigure.AdapterCommonAutoConfiguration

适配器公共自动装配中

代码语言:javascript复制
  @Override
    public void onApplicationEvent(@NonNull ApplicationEvent event) {
        try {
            if (event instanceof RefreshEvent) {
                doRefresh(((RefreshEvent) event).getDtpProperties());
            } else if (event instanceof CollectEvent) {
                doCollect(((CollectEvent) event).getDtpProperties());
            } else if (event instanceof AlarmCheckEvent) {
                doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
            }
        } catch (Exception e) {
            log.error("DynamicTp adapter, event handle failed.", e);
        }
    }

可以看到我们关心的三个发布事件,都在此进行了监听:

doRefresh、doCollect、doAlarmCheck

其中

刷新事件会执行相关渠道的通知

收集日志会执行对应的打印

告警信息会执行告警

三、使用

使用方式:以nacos为例,可以看到其基于@EnableDynamicTp实现对dtp相关bean的注册。DtpBeanDefinitionRegistrar即是完成注册的类。其主要是创建dtp配置对象DtpProperties,绑定dtp配置,获取执行器。拿到执行器后,遍历执行,绑定对应的信息,构建构造函数,注册bean信息。方便后续对线程池的操作。

代码语言:javascript复制
  @Resource
    private ThreadPoolExecutor dtpExecutor1;

    @GetMapping("/dtp-nacos-example/test")
    public String test() throws InterruptedException {
        task();
        return "success";
    }

    //获取dtp执行器
    public void task() throws InterruptedException {
        //获取dtp执行器
        DtpExecutor dtpExecutor2 = DtpRegistry.getDtpExecutor("dtpExecutor2");
        for (int i = 0; i < 100; i  ) {
            Thread.sleep(100);
            dtpExecutor1.execute(() -> {
                log.info("i am dynamic-tp-test-1 task");
            });
            dtpExecutor2.execute(NamedRunnable.of(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("i am dynamic-tp-test-2 task");
            }, "task-"   i));
        }
    }

由此可以看到实现了两个最为主要的功能:对线程池进行动态变更和对线程池的监控告警。使用了观察者模式、适配器模式。

0 人点赞