开源框架分布式任务调度xxl-job

2021-01-18 12:22:50 浏览数 (1)

官网

代码语言:javascript复制
https://www.xuxueli.com/xxl-job/

是什么

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用

特点

弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式

后台界面可视化

去哪下

代码语言:javascript复制
https://gitee.com/xuxueli0323/xxl-job.git

中央仓库地址

代码语言:javascript复制
  1. <!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
  2. <dependency>
  3. <groupId>com.xuxueli</groupId>
  4. <artifactId>xxl-job-core</artifactId>
  5. <version>${最新稳定版本}</version>
  6. </dependency>

怎么玩

1.0初始化“调度数据库”

请下载项目源码并解压,获取 “调度数据库初始化SQL脚本” 并执行即可。

“调度数据库初始化SQL脚本” 位置为:

代码语言:javascript复制
/xxl-job/doc/db/tables_xxl_job.sql

调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;

如果mysql做主从,调度中心集群节点务必强制走主库;

官网demo部署非常简单,执行完sql后按着官网给出配置更改。然后用Maven下载组件jar包启动,支持docker部署

后台页面

架构设计

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

架构图

xxl-job vs quartz

Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:

  • 问题一:调用API的的方式操作任务,不人性化;
  • 问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
  • 问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
  • 问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。

XXL-JOB弥补了quartz的上述不足之处

详细功能请参考官网

源码分析

代码语言:javascript复制
package com.xxl.job.admin.core.trigger;


public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {

                            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);    

                               }

看下processTrigger

代码语言:javascript复制
// 4、trigger remote executor
        ReturnT<String> triggerResult = null;
if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

远程执行方法triggerResult = runExecutor(triggerParam, address);

初始化参数,获取执行策略分片广播

执行参数

代码语言:javascript复制
/**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run")   ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
return runResult;
    }

getExecutorBiz方法

以前版本的同样方法

代码语言:javascript复制
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
//        // valid
//        if (address==null || address.trim().length()==0) {
//            return null;
//        }
//
//        // load-cache
//        address = address.trim();
//        ExecutorBiz executorBiz = executorBizRepository.get(address);
//        if (executorBiz != null) {
//            return executorBiz;
//        }
//
//        // set-cache
//        executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
//                NetEnum.NETTY_HTTP,
//                Serializer.SerializeEnum.HESSIAN.getSerializer(),
//                CallType.SYNC,
//                LoadBalance.ROUND,
//                ExecutorBiz.class,
//                null,
//                5000,
//                address,
//                XxlJobAdminConfig.getAdminConfig().getAccessToken(),
//                null,
//                null).getObject();
//
//        executorBizRepository.put(address, executorBiz);
//        return executorBiz;
//    }

实例化bean时使用序列化协议

代码语言:javascript复制
Serializer.SerializeEnum.HESSIAN.getSerializer()

且使用NetEnum.NETTY_HTTP,NETTY客户端

CallType.SYNC 采用同步的请求方式, LoadBalance.ROUND 负载使用轮询的方式

返回来我们接着看runExecutor执行run方法时

代码语言:javascript复制
/**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run")   ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
return runResult;
    }

不知道你们干没干过代码申诉的活,这让我想起了申诉时被周末加班支配的恐惧

这里封装了Netty的调用方法,把老版本的通过连接池获取 NettyHttpConnectClient 连接,调用其send方法发送请求进行了封装,不过老版本的代码只是注释了,还能看见,所以网上很多资料发现看不到代码,全局搜一下就可以了。

总结

xxl-job 底层就是通过封装quartz netty http封装的rpc框架来完成每一次分布式定时任务的执行

0 人点赞