02 xxl-job执行器启动流程

2023-03-22 19:06:38 浏览数 (1)

执行器示例

下面是xxl-job提供的不同方式集成执行器示例代码:

最常用的当然是springboot集成方式,下面我们就以该示例研究下xxl-job客户端执行器的启动流程。

启动流程

客户端执行器启动流程入口在XxlJobSpringExecutor类中,通过实现spring扩展SmartInitializingSingleton,当IOC的单例Bean加载完成调用方法afterSingletonsInstantiated()

代码语言:javascript复制
@Override
public void afterSingletonsInstantiated() {

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

Bean类型任务解析

首先,我们来分析下initJobHandlerMethodRepository(applicationContext)方法,该方法

代码语言:javascript复制
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {

    // init job handler from method
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean["   beanDefinitionName   "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
           continue;
        }

        ......

    }

}

上面逻辑就是遍历IoC容器中Bean,获取解析带有@XxlJob注解的方法,最后返回Map<Method, XxlJob>结果类型,key就是带有@XxlJob注解的Methodvalue就是该方法上解析的@XxlJob注解信息。

解析获取到带有@XxlJob注解的Method信息后,下面看下如何进一步处理:

代码语言:javascript复制
// 遍历
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
    Method method = methodXxlJobEntry.getKey();
    XxlJob xxlJob = methodXxlJobEntry.getValue();
    if (xxlJob == null) {
     continue;
    }

    // 获取@XxlJob注解value配置
    String name = xxlJob.value();
    if (name.trim().length() == 0) {
     throw new RuntimeException("xxl-job method-jobhandler name invalid, for["   bean.getClass()   "#"   method.getName()   "] .");
    }
    
    // 判断是否同名的已经加载,有则抛异常
    if (loadJobHandler(name) != null) {
     throw new RuntimeException("xxl-job jobhandler["   name   "] naming conflicts.");
    }

    // @XxlJob注解方法参数校验:必须只有一个参数,且参数类型是String,否则抛出异常
    if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
     throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for["   bean.getClass()   "#"   method.getName()   "] , "   "The correct method format like " public ReturnT<String> execute(String param) " .");
    }
    // @XxlJob注解方法返回值校验,必须返回ReturnT类型
    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
     throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for["   bean.getClass()   "#"   method.getName()   "] , "   "The correct method format like " public ReturnT<String> execute(String param) " .");
    }
    method.setAccessible(true);

    // init and destory
    Method initMethod = null;
    Method destroyMethod = null;

    // 解析@XxlJob注解init配置
    if (xxlJob.init().trim().length() > 0) {
        try {
            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
            initMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for["   bean.getClass()   "#"   method.getName()   "] .");
        }
    }
    
    // 解析@XxlJob注解destroy配置
    if (xxlJob.destroy().trim().length() > 0) {
        try {
            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
            destroyMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for["   bean.getClass()   "#"   method.getName()   "] .");
        }
    }

    // 将@XxlJob注解的Method、initMethod和destroyMethod封装成MethodJobHandler,并放入到Map中完成注册,key=@XxlJob.value
    registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}

GlueFactory初始化

GlueFactory主要处理GLUE(Java)类型作业,对GLUE(Java)类型作业源码进行编译、创建实例进行调用,可以支持spring依赖注入,如源码中支持@Autowired@Resource@Qualifier等。

执行器启动流程

super.start()这句才会真正进入执行器启动流程XxlJobExecutor#start

代码语言:javascript复制
public void start() throws Exception {

        // 初始化日志路径
        XxlJobFileAppender.initLogPath(logPath);

        // 初始化AdminBizClient,用于和admin远程交互
        initAdminBizList(adminAddresses, accessToken);

        // 初始化日志清理线程,用于清理日志目录下过期日志文件
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 初始化回调线程,triggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
}

这里比较主要的是后面两条语句,TriggerCallbackThread.getInstance().start();主要启动用于作业执行完成后回调将结果传递给admin模块,具体见下节客户端执行器作业执行流程分析。initEmbedServer(address, ip, port, appname, accessToken);

代码语言:javascript复制
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

上面一堆逻辑是处理ip:port解析,关键是后面两句,EmbedServer#start内部创建一个线程Thread,并启动:

代码语言:javascript复制
public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {

            @Override
            public void run() {
    ......

            }
        });
        thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }

下面再来看下这个线程内部干了什么:

代码语言:javascript复制
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-"   r.hashCode());
                            }
                        },
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                            }
});


try {
 // start server
 ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                     channel.pipeline()
                               .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                               .addLast(new HttpServerCodec())
                               .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                               .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                    }})
               .childOption(ChannelOption.SO_KEEPALIVE, true);

 // bind
 ChannelFuture future = bootstrap.bind(port).sync();

 logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

 // start registry
 startRegistry(appname, address);

    // wait util stop
    future.channel().closeFuture().sync();

}

上面代码一大堆,其实逻辑很简单,主要干了两件事:

  • 使用netty初始化启动了一个http server,主要用于接收admin模块向执行器发送指令,比如下发执行作业指令、kill作业指令,主要处理逻辑封装在EmbedHttpServerHandler中;
  • startRegistry(appname, address)启动客户端执行器定时向admin模块注册线程,逻辑代码在ExecutorRegistryThread#start方法中,比较简单;

xxl-job客户端执行器注册流程大致如下图:

1、客户端利用adminBiz.registry(registryParam)定时周期向admin模块发送注册信息;

2、admin模块接收到客户端注册信息后,插入|更新xxl_job_registry表的update_time字段时间值;

3、admin模块JobRegistryMonitorHelper中启动线程定时扫描xxl_job_registry表,将超时移除,并将在线实例集合拼接在一起更新到执行器地址为自动注册的执行器address_list字段信息中。

总结

xxl-job客户端执行器启动流程还是比较简单,核心主要有两点:

  • 利用netty启动一个http server容器,并将IP:PORT利用注册信息带到admin模块,这样admin就可以给执行器下发运行作业、杀死作业等指令;
  • 执行器定时(默认30秒)向admin注册一次,当admin超过90秒未收到执行器注册信息,即认为该执行器超时,进行下线移除;

0 人点赞