xxl-job学习

2021-02-03 10:09:35 浏览数 (1)

拉取xxl-job的代码

执行XxlJobAdminApplication启动调度中心。

然后执行:

启动执行器中心之后,然后访问admin界面:

从里面可以看到可以执行任务管理。

一、调度中心:从XxlJobAdminConfig可以看到与spring集成的重要方法:

代码语言:javascript复制
@Override
public void afterPropertiesSet() throws Exception {
    adminConfig = this;

    //创建xxlJobScheduler对象
    xxlJobScheduler = new XxlJobScheduler();
    //执行调度中心初始化
    xxlJobScheduler.init();
}

初始化操作:

代码语言:javascript复制
//xxl-job定时任务
public void init() throws Exception {
    //初始化i18n 国际化
    initI18n();

    //admin触发池帮助启动
    JobTriggerPoolHelper.toStart();

    //admin注册监控启动
    JobRegistryHelper.getInstance().start();

    //admin失败监控运行
    JobFailMonitorHelper.getInstance().start();

    //job完成监控运行
    JobCompleteHelper.getInstance().start();

    //admin日志report启动
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    //启动定时任务
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

job触发池启动:此时会创建两种线程池,快线程池和慢线程池对象,这里区别在于调度的阻塞队列的容量。执行addTrigger方法时,首先会选择快线程池,如果此时的线程池处理任务出现超时,同时则会采用慢线程池,加大阻塞队列的容量。在run方法中会调用,而在启动定时任务的时候会调用

代码语言:javascript复制
//添加触发器
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);=>
 // do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); =>    
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

触发器触发job,会对参数进行:

代码语言:javascript复制
//加载数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
 //加载
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
//处理触发器
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());

处理触发器processTrigger的过程中,必然会触发执行器的执行,也即此时会运行executro,可以看到执行过程:

代码语言:javascript复制
1.保日志id
2.初始化触发器参数
3.初始化地址
4.触发远程执行器
5.收集触发器信息
6.保存日志触发器信息      

而这里我们关心的就是执行器的运行:

代码语言:javascript复制
/**
 * run executor 运行执行器
 * @param triggerParam
 * @param address
 * @return
 */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        //获取执行器业务
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //进行运行
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run")   ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

getExecutorBiz可以看到:

代码语言:javascript复制
//获取执行业务
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        // valid
        if (address==null || address.trim().length()==0) {
            return null;
        }

        // load-cache
        address = address.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(address);
        if (executorBiz != null) {
            return executorBiz;
        }

        // set-cache
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

        executorBizRepository.put(address, executorBiz);
        return executorBiz;
    }

run逻辑:

代码语言:javascript复制
 //启动
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler   jobThread
        //加载job线程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        //判断job线程是否为空
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler   jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        //判断粘合类型是否是bean,如果是bean,则加载jobHandler
        //校验jobHandler
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler ["   triggerParam.getExecutorHandler()   "] not found.");
                }
            }

            //如果是groovy
        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
            //如果是脚本
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        //否者直接返回失败,并进行提示
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType["   triggerParam.getGlueType()   "] is not valid.");
        }

        // executor block strategy
        //执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:" ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
                //如果是覆盖,则杀掉线程
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:"   ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        //如果job线程为空,则替换线程 创建或者存在无效
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        //将触发器参数放入到触发器队列
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

如果是bean形式:

代码语言:javascript复制
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

放入之后,我们需要take:

代码语言:javascript复制
 if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                              triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                  );

我们可以看到回调:

代码语言:javascript复制
/**
 * job results callback queue
 */
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
    getInstance().callBackQueue.add(callback);
    logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}

同时可以看到回调take过程:

代码语言:javascript复制
//从回调队列中take任务
HandleCallbackParam callback = getInstance().callBackQueue.take();
//如果callback不为空
if (callback != null) {

    // callback list param
    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
    // drainTo
    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
    //回调参数列表
    callbackParamList.add(callback);

    // callback, will retry if error
    //回调,将会重试如果错误
    if (callbackParamList!=null && callbackParamList.size()>0) {
        //执行回调
        doCallback(callbackParamList);
    }
}

二、执行中心启动:

此时我们来看看执行中心启动的过程:

查看启动:

代码语言:javascript复制
/**
 * @author xuxueli 2018-10-31 19:05:43
 */
public class FramelessApplication {
    public static void main(String[] args) {
        try {
            //启动 重要
            // start
            FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

            // Blocks until interrupted
            // 阻塞直至中断
            while (true) {
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                    break;
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            // destory  销毁xxl-job执行器
            FrameLessXxlJobConfig.getInstance().destoryXxlJobExecutor();
        }

    }

}

初始化job执行器:FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

加载xxl-job-executor.propeties中的配置,填充xxlJobExecutor,注册XxlJobBeanList中的bean。而这个注解则是

代码语言:javascript复制
/**
 * init 初始化xxlJob执行器
 */
public void initXxlJobExecutor() {

    // load executor prop
    //加载执行器配置
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");

    // init executor
    //初始化执行器
    xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

    // registry job bean
    //注册jobBean
    xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));

    // start executor
    //启动执行器 重要
    try {
        xxlJobExecutor.start();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

可以看到xxlJobExecutor里面的信息:

而这个bean则是通过自定义注解实现的:也即我们如果需要执行一个定时任务,可以通过@XxlJob来实现任务bea注册。

代码语言:javascript复制
/**
 * annotation for method jobhandler
 *  自定义注册job处理器方法
 *
 * @author xuxueli 2019-12-11 20:50:13
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlJob {

    /**
     * jobhandler name
     * 任务处理器的名称
     */
    String value();

    /**
     * init handler, invoked when JobThread init
     * 初始化handler,激活当job线程初始化时
     */
    String init() default "";

    /**
     * destroy handler, invoked when JobThread destroy
     * 销毁处理器,激活job线程销毁
     */
    String destroy() default "";

}

而从里面可以看到xxlJobExecutor里面有7个方法,通过反射拿到这些方法:

拿到方法,并放入到map中:

由于此时只是进行初始化,因此jobHandlerRepository还不存在配置信息,通过name拿不到配置。

同时判断拿到的方法是否存在init()、destory()方法,如果不存在,则执行注册jobHandler方法,放入到jobHandlerRepository中,而可以看到IJobHandler里面有三个方法:execute()、init()、destory()方法

执行注册:

代码语言:javascript复制
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

有下面方式:

代码语言:javascript复制
@XxlJob("demoJobHandler")  //方式一,采用注入需要执行的job
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")  //方式二,带生命周期

同时还可以实现任务分分片、命令行任务和跨平台任务.

代码语言:javascript复制
/**
 * 2、分片广播任务
 */
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

    // 分片参数 分片索引和分片总数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

    // 业务逻辑
    for (int i = 0; i < shardTotal; i  ) {
        if (i == shardIndex) {
            XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
        } else {
            XxlJobHelper.log("第 {} 片, 忽略", i);
        }
    }

}

配置加载完成,属性信息填充完,将bean进行注册。完成之后,便可以执行任务处理器启动。

而执行执行任务处理器启动时,会首先初始化job处理器方法

代码语言:javascript复制
//启动执行器
public void start() {

    // init JobHandler Repository (for method)
    //初始化job处理器reposity
    initJobHandlerMethodRepository(xxlJobBeanList);

    // super start
    //父类启动
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

执行启动:

代码语言:javascript复制
//启动和停止
public void start() throws Exception {

    // init logpath
    //初始化logPath 将logBasePath = logPath,创建文件路径、目录
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    //初始化激活,admin客户端
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    //初始化job日志文件清理线程,使用递归删除文件
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    //初始化触发回调线程
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    //初始化执行服务  重要 服务器启动
    initEmbedServer(address, ip, port, appname, accessToken);
}

初始化服嵌入服务器:executor服务,执行的过程中,会填充相关服务器的配置信息,然后执行启动服务器,可以看到服务器使用的Netty,同时其里面的handler处理与前面的业务队列形成对应。

代码语言:javascript复制
//启动应用服务器
public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            //业务线程池
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-"   r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });
            try {
                //启动服务
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                //执行绑定
                ChannelFuture future = bootstrap.bind(port).sync();
                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                // start registry
                //启动注册
                startRegistry(appname, address);
                // wait util stop
                future.channel().closeFuture().sync();
            }  finally {
                // stop 优雅停机
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

关注EmbedHttpServerHandler和 startRegistry(appname, address):

netty中执行业务处理:对读事件进行处理

代码语言:javascript复制
   //执行read操作
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        // request parse
        String requestData = msg.content().toString(CharsetUtil.UTF_8);
        String uri = msg.uri();
        HttpMethod httpMethod = msg.method();
        boolean keepAlive = HttpUtil.isKeepAlive(msg);
        String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

        // invoke
        bizThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                //执行处理
                Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
                // to json
                String responseJson = GsonTool.toJson(responseObj);
                //写入响应
                writeResponse(ctx, keepAlive, responseJson);
            }
        });
    }

    //处理
    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        // services mapping
        //服务映射
        try {
            if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("  uri  ") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"   ThrowableUtil.toString(e));
        }
    }

    /**
     * write response
     */
    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
        // write response
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

这里的业务处理与前面调度中心中我们执行业务服务的run时,将业务执行的任务放入队列中,形成照应,也即有任务来时,就会触发EmbedHttpServerHandler中的读事件的处理。

注册startRegistry(appname, address):包括注册线程和注册移除

代码语言:javascript复制
//执行注册操作
ReturnT<String> registryResult = adminBiz.registry(registryParam);
//执行注册移除操作
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);

进行adminBizClient:

代码语言:javascript复制
//执行注册操作,类似采用httpClient,api/registry
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl   "api/registry", accessToken, timeout, registryParam, String.class);
}

//注册移除,类似httpClient调用,api/registryRemove
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl   "api/registryRemove", accessToken, timeout, registryParam, String.class);
}

最终调用:

代码语言:javascript复制
 //job注册帮助类
public ReturnT<String> registry(RegistryParam registryParam) {

   // valid
   if (!StringUtils.hasText(registryParam.getRegistryGroup())
         || !StringUtils.hasText(registryParam.getRegistryKey())
         || !StringUtils.hasText(registryParam.getRegistryValue())) {
      return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
   }

   // async execute
   //异步执行
   registryOrRemoveThreadPool.execute(new Runnable() {
      @Override
      public void run() {
         //注册更新
         int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
         //ret<1,则执行注册保存操作
         if (ret < 1) {
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            // fresh
            //执行刷新操作
            freshGroupRegistryInfo(registryParam);
         }
      }
   });

   return ReturnT.SUCCESS;
}

而从Handler可看到处理的请求则是:

代码语言:javascript复制
 if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            }
   }         

而run请求则是:判断不同类型的请求,采用不同的方式接,将触发器参数放入到触发器队列

代码语言:javascript复制
//启动
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler   jobThread
    //加载job线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    //判断job线程是否为空
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler   jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    //判断粘合类型是否是bean,如果是bean,则加载jobHandler
    //校验jobHandler,对不同的枚举类型进行适配,从而执行相应的方式完成
    // executor block strategy
    //执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:" ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
            //如果是覆盖,则杀掉线程
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:"   ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
    //如果job线程为空,则替换线程 创建或者存在无效
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
    //将触发器参数放入到触发器队列
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

将触发器放入队列:

代码语言:javascript复制
 /**
    * new trigger to queue
 * 新触发器到队列
    *
    * @param triggerParam
    * @return
    */
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
   // avoid repeat
   if (triggerLogIdSet.contains(triggerParam.getLogId())) {
      logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
      return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:"   triggerParam.getLogId());
   }

   triggerLogIdSet.add(triggerParam.getLogId());
   //添加到触发队列
   triggerQueue.add(triggerParam);
       return ReturnT.SUCCESS;
}

而写入响应:

代码语言:javascript复制
/**
 * write response
 */
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
    // write response
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
    if (keepAlive) {
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }
    ctx.writeAndFlush(response);
}

除此之外,xxl-job还有相关的任务分配策略:一致性hash、LRU、LFU、随机、轮询等。

0 人点赞