Job的创建和调用
在成功创建一个job后,Spring Batch 默认在项目启动时候执行配置的job。往往在正常业务处理中,需要我们手动或者定时去触发job,所以这边便引入了jobLauncher、jobOperator两个执行器。
JobLauncher配置
此处我们通过web的API接口去调用 jobLauncher,通过接口传入job的参数。调用的Job 是根据 在创建job时候,Bean name去指定。
代码语言:javascript复制@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
代码语言:javascript复制 @Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobLaunchDemoJob;
@GetMapping("/{job1param}")
public String runJob1(@PathVariable("job1param") String job1param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
System.out.println("Request to run job1 with param: " job1param);
JobParameters jobParameters = new JobParametersBuilder()
.addString("job1param",job1param)
.toJobParameters();
jobLauncher.run(jobLaunchDemoJob,jobParameters);
return "Job1 success.";
}
JobOperator配置
代码语言:javascript复制 @Bean
public JobOperator jobOperator(){
SimpleJobOperator operator = new SimpleJobOperator();
operator.setJobLauncher(jobLauncher);
operator.setJobParametersConverter(new DefaultJobParametersConverter());
operator.setJobRepository(jobRepository);
operator.setJobExplorer(jobExplorer);
operator.setJobRegistry(jobRegistry);
return operator;
}
代码语言:javascript复制 @Autowired
private JobOperator jobOperator;
@GetMapping("/{job2param}")
public String runJob1(@PathVariable("job2param") String job2param) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException {
System.out.println("Request to run job2 with param: " job2param);
jobOperator.start("jobOperatorDemoJob","job2param=" job2param);
return "Job2 success.";
}
最后,定时任务调用,通过corn表达式,满足条件时候,即执行
代码语言:javascript复制 @Scheduled(fixedDelay = 5000)
public void scheduler() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, JobParametersNotFoundException, NoSuchJobException {
jobOperator().startNextInstance("jobScheduledDemoJob");
}
Job的嵌套
某些情况下,一个job需要嵌套执行另一个job,此时需要借助step。在一个step中嵌套执行一个子job,然后在父job中执行这个嵌套的step即可。
1.job可以嵌套使用,嵌套的Job我们将其称为子job,被嵌套的Job我们将其称为父job;
2.一个父Job可以有多个子Job;
3.子job不能单独运行,需要其父Job去启动;
如图所示
执行上图作业的程序配置如下:
代码语言:javascript复制 /*得到一个step*/
@Bean
public Step childStep(){
return stepBuilderFactory.get("child").tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
System.out.println("子step执行……");
return RepeatStatus.FINISHED;
}).build();
}
/*得到一个job*/
@Bean
public Job childJob(){
return jobBuilderFactory.get("childJob")
.start(childStep())
.build();
}
/*得到一个step,用于嵌套job*/
/*
* new StepBuilder("childStepJob")创建一个Step的工厂类,childStepJob指定一个Step的名字;
* new JobStepBuilder创建一个工厂类,用来在step中嵌入一个job
* repository用于存储job、step的执行信息,如果一个step已经执行完,一个job再次调用该step时,会提示step已经执行完毕,不会再执行
* 该段代码意思:创建一个名字为childStepJob的step,在该step中嵌入执行一个名字为childJob的job
* */
@Bean
public Step nestedStep(){
return new JobStepBuilder(new StepBuilder("childStepJob"))
.job(childJob())
.repository(jobRepository)
.build();
}
/*得到一个step,step的名字是parent*/
@Bean
public Step parentStep(){
return stepBuilderFactory.get("parent").tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
System.out.println("父step执行……");
return RepeatStatus.FINISHED;
}).build();
}
/*得到一个job,job的名字为parent,注意不是parentJob,parentJob只是函数名*/
@Bean
public Job parentJob(){
return jobBuilderFactory.get("parent")
.start(nestedStep())
.next(parentStep())
.build();
}
}
在application.propertis中加入配置: ``` spring.batch.job.names=parentJob ```
程序中定义了childJob和parentJob两个job,childJob中执行的是名为child的step;parentJob中执行的名为parent和childStepJob的step,而childStepJob中执行的是名为childJob的job。
日志打印如下。
代码语言:javascript复制Job: [SimpleJob: [name=parentJob]] launched with the following parameters: [{}]
Executing step: [childStepJob]
Job: [SimpleJob: [name=childJob]] launched with the following parameters: [{}]
Executing step: [child]
子step执行……
Job: [SimpleJob: [name=childJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Executing step: [parent]
父step执行……
Job: [SimpleJob: [name=parentJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
通过日志可以看出,只执行了parentJob,首先执行了parentJob中的名为childStepJob的step,执行childStepJob时执行了内嵌的名为childJob的Job;然后又执行了parentJob中名为parent的step
Job参数
1.JobParameters作用:在Job运行过程中,可以用来传递信息
2.通过”key---->value”键值对的形式传入,在代码中我们通过get(“key”)来获取value值
3.job的参数是在整个job的step的生命周期中都可以使用到,我们可以根据不同业务处理逻辑,传入所需参数。
调用过程,demo如下:
代码语言:javascript复制@Bean
public Job myJobParametersDemoJob(){
return jobBuilderFactory.get("myJobParametersDemoJob")
.start(myJobParametersDemoStep())
.build();
}
@Bean
public Step myJobParametersDemoStep() {
return stepBuilderFactory.get("myJobParametersDemoStep")
.listener(this)
.tasklet(((contribution, chunkContext) -> {
System.out.println("Parameter is : " params.get("info"));
return RepeatStatus.FINISHED;
})).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
params = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
Flow的创建和使用
Step 是一个独立的、顺序的处理步骤,包含完整的输入、处理以及输出。但是在企业应用中,我们面对的更多情况是多个步骤按照一定的顺序进行处理。因此如何维护步骤之间的执行顺序是我们需要考虑的。Spring Batch 提供了 Step Flow 来解决这个问题。
Flow有以下特点:
- flow是一个Step的集合,他规定了Step与Step之间的转换关系;
- 创建Flow可以达到复用的效果,让其在不同的Job之间进行复用;
- 使用FlowBuilder去创建一个Flow,他和Job类似,使用start(),next()以及end()来运行flow;
顺序Flow
``` // 创建Flow对象,指明Flow对象包含哪些Step @Bean public Flow jobFlowDemoFlow1(){ return new FlowBuilder("jobFlowDemoFlow1") .start(jobFlowDemoTwoStep1()) .next(jobFlowDemoTwoStep2()) .build(); }
@Bean public Job jobFlowDemo(){ return jobBuilderFactory.get(“jobFlowDemo”) .start(jobFlowDemoFlow1()).end() .build(); }
代码语言:javascript复制 <br/> <br/>
## 决策器
<br/>
动态自定义Decider 决定Flow 执行顺序
1.Decision:为我们提供下一步执行哪一个Step提供条件决策
2.JobExecutionDecider:接口,提供决策条件
<br/>
1、自定义一个MyDecider, 根据调用次数,基数返回“EVEN”,偶数返回“ODD”;
public class MyDecider implements JobExecutionDecider { private int count = 0; @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { count ; if (count % 2 == 0) return new FlowExecutionStatus(“EVEN”); else return new FlowExecutionStatus(“ODD”); }
代码语言:javascript复制 <br/>
2、job中调用MyDecider,当返回“EVEN”时,调用evenStep;当返回“ODD”时,调用oddStep。
代码语言:javascript复制@Bean
public Step oddStep(){
return stepBuilderFactory.get("oddStep")
.tasklet(((stepContribution, chunkContext) -> {
System.out.println("oddStep");
return RepeatStatus.FINISHED;
})).build();
}
@Bean
public Step evenStep(){
return stepBuilderFactory.get("evenStep")
.tasklet(((stepContribution, chunkContext) -> {
System.out.println("evenStep");
return RepeatStatus.FINISHED;
})).build();
}
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
@Bean
public Job flowDecisonDemoJob(){
return jobBuilderFactory.get("flowDecisonDemoJob").start(firstStep())
.next(myDecider())
.from(myDecider()).on("EVEN").to(evenStep())
.from(myDecider()).on("ODD").to(oddStep())
.from(oddStep()).on("*").to(myDecider())
.end()
.build();
}
代码语言:javascript复制<br/><br/><br/>
# 并行处理Split
<br/>
当遇到数据量比较大或者每个ITEM处理耗时的时候,能够并行处理是很重要的。并行处理的时候一般经历Split-Partitioned-Merge的过程
![file](https://graph.baidu.com/resource/2223b0a952f98c501fd7701583220953.png)
<br/>
// tag::jobstep[] @Bean public Job readandwritedbJob() { Flow flow1 = new FlowBuilder(“asyncFlow1”).start(step2()).build(); Flow flow2 = new FlowBuilder(“asyncFlow2”).start(step3()).build(); return jobBuilderFactory.get(“readandwritedbJob”) .incrementer(new RunIdIncrementer()) .flow(step1()) .split(new SimpleAsyncTaskExecutor()).add(flow1,flow2) .end() .build(); }
代码语言:javascript复制@Bean
public Tasklet tasklet1() {
return new ReadAndWriteDbTask();
}
@Bean
public Tasklet tasklet2() {
return new ReadAndWriteDbTask2();
}
@Bean
public Tasklet tasklet3() {
return new ReadAndWriteDbTask3();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet(tasklet3())
.build();
}
// end::jobstep[]
代码语言:javascript复制<br/><br/><br/>
# 监听Job Execution
<br/>
1.Listener:控制Job执行的一种方式
2.可以通过接口或者注解实现监听器
3.在spring-batch中提供各个级别的监听器接口,从job级别到item级别都有
(1)JobExecutionListener(before..,after..);
(2)StepExecutionListener(before..,after..);
(3)ChunkListener(before..,after..);
(4)ItemReaderListener;ItemWriterListener;ItemProcessListener(before..,after..,error..);
<br/>
当任务执行完毕或开始执行时,需要执行一些处理工作。这个时候可以使用JobExecutionListener:
public interface JobExecutionListener { void beforeJob(JobExecution jobExecution); void afterJob(JobExecution jobExecution); }
代码语言:javascript复制<br/>
添加方式:
@Bean public Job footballJob() { return this.jobBuilderFactory.get(“footballJob”) .listener(sampleListener()) //JobExecutionListener的实现类 … .build(); }
代码语言:javascript复制<br/>
需要注意的是afterJob方法无论批处理任务成功还是失败都会被执行,所以增加以下判断:
public void afterJob(JobExecution jobExecution){ if( jobExecution.getStatus() == BatchStatus.COMPLETED ){ //job success } else if(jobExecution.getStatus() == BatchStatus.FAILED){ //job failure } }
代码语言:javascript复制<br/>
除了直接实现接口还可以用 @BeforeJob 和 @AfterJob 注解。
参考:
https://blog.csdn.net/wuzhiwei549/category_8573774.html
https://blog.51cto.com/13501268/2177746