在成功创建一个job后,Spring Batch 默认在项目启动时候执行配置的job。往往在正常业务处理中,需要我们手动或者定时去触发job,所以这边便引入了jobLauncher、jobOperator两个执行器。
JobLauncher作业调度
启动一个job
运行一个批处理任务至少有两点要求:一个 JobLauncher 和一个用来运行的 job 。它们都包含了相同或是不同的 context 。
Spring Boot默认支持自动启动已配置好的Job,我们可以通过配置项spring.batch.job.enabled=false来禁止Spring容器自动启动Job。
Spring Launch API它的核心就是 JobLauncher 接口。JobLauncher 需要2个参数:Job , JobParameters。
JobLauncher 的接口:
代码语言:javascript复制public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters) throws (…);
}
正常情况下,当我们通过调度器调用Job时,从命令行来启动job,会为每一个job初始化一个JVM,因此每个job会有一个自己的 JobLauncher,整个流程如下:
从web容器的HttpRequest来启动job,一般只是用一个 JobLauncher 来异步启动job,http请求会调用这个 JobLauncher 来启动它们需要的job。通过web启动job的例子:
前端index.html
代码语言:javascript复制<input type="text" id="msg">
<button onclick="runJob1()">Run Job 1</button>
<script type="text/javascript">
var baseurl="http://localhost:8080";
var xhttp=new XMLHttpRequest();
function runJob1(){
var msg = document.getElementById('msg').value;
var url = baseurl '/job/' msg;
xhttp.open('GET',url,true);
xhttp.send();
xhttp.onreadystatechange = function(){
if(this.readyState == 4 && this.status ==200){
console.log('Job status: ' this.responseText);
}
}
}
</script>
JobLauncherController
代码语言:javascript复制@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
JobLauncherDemoJob jobLauncherDemoJob;
@RequestMapping("/job/{msg}")
public void handle(@PathVariable String msg) throws Exception{
// 把接收到的参数传给任务
JobParameters parameters = new JobParametersBuilder().addString("msg",msg).toJobParameters();
jobLauncher.run(jobLauncherDemoJob, parameters);
return "JobLauncher success.";
}
}
jobLauncher ``` @Bean public JobLauncher jobLauncher() { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务 jobLauncher.afterPropertiesSet(); return jobLauncher; } ```
JobLauncherDemo
代码语言:javascript复制public class JobLauncherDemo implements StepExecutionListener{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Map<String,JobParameters> parameters;
@Bean
public Job jobLauncherDemoJob() {
return jobBuilderFactory.get("jobLauncherDemoJob")
.listener(this)
.start(jobLauncherDemoStep())
.build();
}
@Bean
public Step jobLauncherDemoStep() {
return stepBuilderFactory.get("jobLauncherDemoStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("jobLauncherDemoStep,msg: " parameter.get("msg").getValue());
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExcution){
parameter = stepExcution.getJobParameters().getParameters();
}
运行结果
停止一个job
谁需要停止job
(1)、使用者可能由于某些原因,需要停止job的运行,比如发现job出现数据错误,或者抛出异常,需要停止job的工作。
(2)、开发者在开发程序的过程中,开发者明确的知道一些业务逻辑需要停止job。比如,一个job运行的时间不能超过早上8点,如果超过这个时间需要停止job的运行,等等的情况。
此时job中的程序在运行逻辑代码,只有当这些业务完成之后,程序的管理权交回到spring batch的时候,才会被终止。如果中间的业务运行需要很长的时间,则job不会马上停止。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。
如何停止job
(1)、运行的过程中抛出一个exception,造成job的停止。
(2)、利用StepExecution来设置一个标识,停止job的运行。
最好的办法是,利用StepExecution来设置一个标识,停止job的运行。
a、在Tasklet接口的方法中有StepExecution参数,可以进行调用。
示例:
代码语言:javascript复制public class ProcessItemsTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
if(shouldStop()) {
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
processItem();
if(moreItemsToProcess()) {
return RepeatStatus.CONTINUABLE;
} else {
return RepeatStatus.FINISHED;
}
}
(...)
}
b、面向“块”的step中,ItemReader, ItemProcessor, 和 ItemWriter 这3个接口中,它们中间没有 StepExecution。此时我们需要用到Listener中的StepExecution。
示例:
代码语言:javascript复制public class StopListener {
private StepExecution stepExecution;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@AfterRead
public void afterRead() {
if(stopConditionsMet()) {
stepExecution.setTerminateOnly();
}
}
(...)
}
从业务逻辑的需要出发,停止job的最佳的方式,还是设置stepExecution.setTerminateOnly();这个job停止标识,来让job停止运行。
放弃一个job
一个job的执行过程当执行到FAILED状态之后,如果它是可重启的,它将会被重启。
如果任务的执行过程状态是ABANDONED,那么框架就不会重启它。ABANDONED状态也适用于执行步骤,使得它们可以被跳过,即便是在一个可重启的任务执行之中:如果任务执行过程中碰到在上一次执行失败后标记为ABANDONED的步骤,将会跳过该步骤直接到下一步(这是由任务流定义和执行步骤的退出码决定的)。
如果当前的系统进程死掉了(“kill -9”或系统错误),job自然也不会运行,但JobRepository是无法侦测到这个错误的,因为进程死掉之前没有对它进行任何通知。你必须手动的告诉它,你知道任务已经失败了还是说考虑放弃这个任务(设置它的状态为FAILED或ABANDONED)-这是业务逻辑层的事情,无法做到自动决策。
只有在不可重启的任务中才需要设置为FAILED状态,或者你知道重启后数据还是有效的。Spring Batch Admin中有一系列工具JobService,用以取消正在进行执行的任务。
失败一个job
失败的job是可以重新启动的,因为它的状态是FAILED,如果step2失败,则返回一个EARLY TERMINATION的返回码,step3也就不会执行。否则继续执行step3
代码语言:javascript复制<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
结束一个job
已经结束的job是不能重新启动的,因为它的状态是COMPLETED。如果step2失败了,则step3就不执行了,该job也就COMPLETED,结束了。如果step2成功了,则继续往下执行step3。
代码语言:javascript复制<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
JobOperator
一个JobLauncher使用一个JobRepository创建并运行新的JobExection对象,Job和Step实现随后使用相同的JobRepository在job运行期间去更新相同的JobExecution对象。
这些基本的操作能够满足简单场景的需要,但是对于有着数百个任务和复杂定时流程的大型批处理情况来说,就需要使用更高级的方式访问元数据:
JobRepository 提供了对元数据的 CRUD 操作,JobExplorer 提供了对元数据的只读操作。然而,这些操作最常用于联合使用诸多的批量操作类,来对任务进行监测,并完成相当多的任务控制功能,比如停止、重启或对任务进行汇总。在Spring Batch 中JobOperator 接口提供了这些操作类型:
代码语言:javascript复制public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
JobOperator 最常见的作用莫过于停止某个Job:
代码语言:javascript复制Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即发生的,因为没有办法将一个任务立刻强制停掉,尤其是当任务进行到开发人员自己的代码段时,框架在此刻是无能为力的,比如某个业务逻辑处理。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。 此处我们通过web的API接口去调用 jobOperator,通过接口传入job的参数。调用的Job 是根据 在创建job时候,Bean name去指定。
示例:
代码语言:javascript复制@Controller
public class JobOperatorController {
@Autowired
private JobOperatorDemo jobOperatorDemo;
//不需要注入Job对象
//@Autowired
//JobLauncherDemoJob jobLauncherDemoJob;
@RequestMapping("/job/{msg}")
public void handle(@PathVariable String msg) throws Exception{
// 把接收到的参数传给任务
System.out.println("Request to run job2 with param: " job2param);
jobOperatorDemo.start("jobOperatorDemoJob","msg=" msg);
return "JobOperator success.";
}
}
``` public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{
代码语言:javascript复制@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Map<String,JobParameters> parameters;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRegistry jobRegistry;
@Autowired
private JobLauncher jobLauncher;
pricate ApplicationContext context;
@Bean
public JobRepositoryBeanPostProcessor jobRegistrar() throws Exception{
JobRepositoryBeanPostProcessor postProcessor = new JobRepositoryBeanPostProcessor;
postProcessor.setJobRegistry(jobRegistry);
postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
postProcessor.afterPropertiesSet();
return postProcessor;
}
@Bean
public JobOperator jobOperator(){
SimpleJobOperator operator = new SimpleJobOperator();
operator.setJobLauncher(jobLauncher);
// 参数转换
operator.setJobParametersConverter(new DefaultJobParametersConverter());
// 持久化方式
operator.setJobRepository(jobRepository);
// 任务相关信息
operator.setJobExplorer(jobExplorer);
// 注册:任务名字符串和任务对象关联
operator.setJobRegistry(jobRegistry);
return operator;
}
@Bean
public Job jobOperatorDemo() {
return jobBuilderFactory.get("jobOperatorDemo")
.listener(this)
.start(jobOperatorDemoStep())
.build();
}
@Bean
public Step jobOperatorDemoStep() {
return stepBuilderFactory.get("jobOperatorDemoStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("jobOperatorDemoStep,msg: " parameter.get("msg").getValue());
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExcution){
parameter = stepExcution.getJobParameters().getParameters();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext){
this.context = applicationContext;
}
代码语言:javascript复制<br/><br/>
<br/><br/>
参考:
https://blog.csdn.net/github_36849773/article/details/66968461
https://www.iteye.com/blog/kanpiaoxue-1771208
https://www.cnblogs.com/nizuimeiabc1/p/9409492.html
https://blog.csdn.net/kangkanglou/article/details/82627799
https://blog.csdn.net/wuzhiwei549/article/details/85394406