前言
Elastic-Job是当当基于Zookepper,Quartz开发并且开源的Java分布式定时任务,解决Quartz不支持分布式的弊端。它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
题外话,当当的Sharding-JDBC已经加入Apache管理了,而这个Elastic-Job没有。
还有一句题外话,私下和好友交流,当初要使用elastic 后来也被换了,具体原因也没说,咱也不知道,咱也不敢问。我贴个图大家应该会明白,这是Elastic-Job 在github的地址:https://github.com/elasticjob/elastic-job-lite
来个图更能说明问题。我什么也没说啊。。。。
采用的公司还是蛮多的,像 36氪、国美、唯品会……链接:http://elasticjob.io/docs/elastic-job-lite/00-overview/company/
网上也是对Elastic-Job众多好评,目前国内使用定时作业中间件也就xxl-job能跟他竞争。
再不更新代码都落灰了……
正文
正文我就不扯没用的了,好多人也是冲着标题写着 ‘源码解析’ 来的吧,好,请看下面就是我们老艿艿总结的源码解析专栏
我也是老艿艿的不锈钢粉丝哦!!!
微信公众号Elastic-Job源码解析地址:https://mp.weixin.qq.com/s/m1VRIzeFfa_6Ly_gEDNK-w
老艿艿微信公众号:芋道源码
一定有你想看的
不是有意打广告,而是老艿艿文章确实不错,也从不搞一些吸粉的商业活动,不做盗版。文章怎么样,看一下你就懂了。
友情提醒:看源码前要会使用并理解简单的原理,这样吸收比较好,上来就看源码直接就从入门到放弃。
Elastic-Job作为分布式作业中间件有一个重要的概念就是-分片。
分片概念: 这里的分片是指将一个任务拆分成多个任务执行,有点类似 java里的Fork-Join 框架思想。
举个例子:我现在要对一些数据进行处理,首先把数据筛选出来,为了快速的执行作业,我们用2台服务器,想让每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。
作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直结果就是服务器A遍历ID以0-4结尾的数据,服务器B遍历ID以5-9结尾的数据。
如何使用Elastic-Job
Elastic-Job 提供了3种作业类型
- Simple类型作业
- DataFlow类型作业
- Script类型作业
Elastic-Job 提供了2种配置方式
- JavaCode配置
- Spring命名空间配置
这里我只用代码讲解 用JavaCode配置方式实现Simple作业类型。
首先提供一下官网详细的配置手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
准备工作
本机安装一个 Zookepper ,不搞集群,下载,安装 ,启动 so easy,我的版本是 3.4.14
引入Maven依赖
代码语言:javascript复制
代码语言:javascript复制<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<name>springboot-elastic-job</name>
<description>基于 Spring Boot 2.1.5 使用elastic-job</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
代码语言:javascript复制
Simple作业
代码语言:javascript复制public class JavaSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())
" 分片项 : " shardingContext.getShardingItem()
" 总片数 : " shardingContext.getShardingTotalCount());
switch (shardingContext.getShardingItem()) {
case 0:
System.out.println("do something by sharding item 0");
break;
case 1:
System.out.println("do something by sharding item 1");
break;
case 2:
System.out.println("do something by sharding item 2");
break;
// case n: ...
//动态查询该分片下要执行的用户
//SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
}
}
}
启动类和配置作业
代码语言:javascript复制public class Application {
public static void main(String[] args) throws UnknownHostException {
System.out.println("Start...");
System.out.println(InetAddress.getLocalHost());
new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
//ZookeeperConfiguration构造方法两个参数,serverLists(连接Zookeeper服务器的列表,包括IP地址和端口号,,多个地址用逗号分隔)和namespace(命名空间)
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
new ZookeeperConfiguration("127.0.0.1:2181", "new-elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createSimpleJobConfiguration() {
//创建简单作业配置构建器,三个参数为:jobName(作业名称),cron(作业启动时间的cron表达式),shardingTotalCount(作业分片总数)
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("SimpleJobDemo", "0/15 * * * * ?", 2).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, JavaSimpleJob.class.getCanonicalName());
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return (LiteJobConfiguration) simpleJobRootConfig;
}
//配置DataflowJob
private static void setUpDataflowJob(final CoordinatorRegistryCenter registryCenter) {
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("JavaDataflowJob", "0/10 * * * * ?", 2).build();
//数据流作业配置,第三个参数为streamingProcess(是否为流式处理)
DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(coreConfiguration, JavaDataflowJob.class.getCanonicalName(), true);
new JobScheduler(registryCenter, LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true).build()).init();
}
}
启动测试
我截取了一段控制台日志
github 地址:https://github.com/362460453/springboot-elastic-job
现在搞什么配置如果非必须情况下最好不要用xml形式,好古老,真的,如果有想学Spring命名空间配置 请看下面的链接:
https://www.cnblogs.com/yushangzuiyue/p/9655847.html
现实场景怎么用?
我们假设一个场景是处理一批数据,这批数据,一台机器的话分多个片执行相当于多线程执行,实际情况我们都是多台服务器执行的,使用2台机器,那么就把压力平均分摊到两台服务器上去了,而且也能更快执行完成。 如果,以前只要两台机器在1个小时就能跑完的,现在5个小时也跑不完,怎么办呢?加机器?加机器肯定是必须的,但是我们发现代码里写死了,分2片,我们总不能去改成3、4、5,万一以后还有更多呢,所以我们可以对sql进一步优化。我们把分片数和当前分片项传到sql,这样sql可以动态去查询对应分片后的用户了
代码语言:javascript复制public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardingTotalCount = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
//1 查询改分片下要执行的用户,带参数shardingTotalCount和shardingItem
//筛选出来的数据
//2 处理筛选的数据
}
}
sql改写
//动态查询该分片下要执行的用户
SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
代码语言:javascript复制
当shardingTotalCount 有5片,并且5台机器 机器0:查询用户id能整除5的,如:5、10、15、20…… 机器1:查询用户id除以5余1的,如:1、6、11、16…… 机器2:查询用户id除以5余2的,如:2、7、12、17…… 机器3:查询用户id除以5于余3的,如:3、8、13、18…… 机器4:查询用户id除以5余4的,如:4、9、14、19……
注意事项
这里要说一个我碰到的问题,LiteJobConfiguration里有一个属性是 boolean overwrite; 默认为false,如果为false的话,第一次启动的时候,会在zookeeper中保存了一份作业信息(调度时间、参数等),后面即使修改了作业信息,无论重新启动服务或者zookeeper,还是会使用第一次启动时候的作业信息(根据作业名字)。 因此需要设为true,这样每次启动,作业信息都会覆盖zookeeper中的保存的配置信息,这样可以保证修改了配置信息可以马上使用。
代码启动中,实现类的execute方法中不能使用spring注入的对象:elastic-job是封装的quartz框架,这个特性也存留下来,execute方法中只能用static对象。
如有其他问题请查看:
https://blog.csdn.net/name_z/article/details/81274029
https://blog.csdn.net/tanga842428/article/details/52398982