Spring Batch 批处理(8) - JobLauncher和JobOperator

2020-05-26 17:17:22 浏览数 (1)

在成功创建一个job后,Spring Batch 默认在项目启动时候执行配置的job。往往在正常业务处理中,需要我们手动或者定时去触发job,所以这边便引入了jobLauncher、jobOperator两个执行器。

JobLauncher作业调度

启动一个job

运行一个批处理任务至少有两点要求:一个 JobLauncher 和一个用来运行的 job 。它们都包含了相同或是不同的 context 。

Spring Boot默认支持自动启动已配置好的Job,我们可以通过配置项spring.batch.job.enabled=false来禁止Spring容器自动启动Job。

Spring Launch API它的核心就是 JobLauncher 接口。JobLauncher 需要2个参数:Job , JobParameters。

JobLauncher 的接口:

代码语言:javascript复制
public interface JobLauncher {  
    public JobExecution run(Job job, JobParameters jobParameters) throws (…);  
}  

正常情况下,当我们通过调度器调用Job时,从命令行来启动job,会为每一个job初始化一个JVM,因此每个job会有一个自己的 JobLauncher,整个流程如下:

从web容器的HttpRequest来启动job,一般只是用一个 JobLauncher 来异步启动job,http请求会调用这个 JobLauncher 来启动它们需要的job。通过web启动job的例子:

前端index.html

代码语言:javascript复制
<input type="text" id="msg">
<button onclick="runJob1()">Run Job 1</button>

<script type="text/javascript">
	var baseurl="http://localhost:8080";
	var xhttp=new XMLHttpRequest();
	
	function runJob1(){
		var msg = document.getElementById('msg').value;
		var url = baseurl   '/job/'  msg;
		xhttp.open('GET',url,true);
		xhttp.send();
		
		xhttp.onreadystatechange = function(){
			if(this.readyState == 4 && this.status ==200){
				console.log('Job status: ' this.responseText);
			}
		}
	}
	
</script>

JobLauncherController

代码语言:javascript复制
@Controller
public class JobLauncherController {
    @Autowired
    JobLauncher jobLauncher;
    @Autowired
    JobLauncherDemoJob jobLauncherDemoJob;
    @RequestMapping("/job/{msg}")
    public void handle(@PathVariable String msg) throws Exception{
			// 把接收到的参数传给任务
			JobParameters parameters = new JobParametersBuilder().addString("msg",msg).toJobParameters();
			
			jobLauncher.run(jobLauncherDemoJob, parameters);
        return "JobLauncher success.";
    }
}

jobLauncher ``` @Bean public JobLauncher jobLauncher() { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务 jobLauncher.afterPropertiesSet(); return jobLauncher; } ```

JobLauncherDemo

代码语言:javascript复制
public class JobLauncherDemo implements StepExecutionListener{

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
	private Map<String,JobParameters> parameters;
	
    @Bean
    public Job jobLauncherDemoJob() {
        return jobBuilderFactory.get("jobLauncherDemoJob")
				.listener(this)
                .start(jobLauncherDemoStep())
                .build();
    }
		
	@Bean
    public Step jobLauncherDemoStep() {
        return stepBuilderFactory.get("jobLauncherDemoStep")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("jobLauncherDemoStep,msg: " parameter.get("msg").getValue());
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
	  
	@Override
	public void beforeStep(StepExecution stepExcution){
		parameter = stepExcution.getJobParameters().getParameters();
	}

运行结果

停止一个job

谁需要停止job

(1)、使用者可能由于某些原因,需要停止job的运行,比如发现job出现数据错误,或者抛出异常,需要停止job的工作。

(2)、开发者在开发程序的过程中,开发者明确的知道一些业务逻辑需要停止job。比如,一个job运行的时间不能超过早上8点,如果超过这个时间需要停止job的运行,等等的情况。

此时job中的程序在运行逻辑代码,只有当这些业务完成之后,程序的管理权交回到spring batch的时候,才会被终止。如果中间的业务运行需要很长的时间,则job不会马上停止。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。

如何停止job

(1)、运行的过程中抛出一个exception,造成job的停止。

(2)、利用StepExecution来设置一个标识,停止job的运行。

最好的办法是,利用StepExecution来设置一个标识,停止job的运行。

a、在Tasklet接口的方法中有StepExecution参数,可以进行调用。

示例:

代码语言:javascript复制
public class ProcessItemsTasklet implements Tasklet {  
    @Override  
    public RepeatStatus execute(StepContribution contribution,  
            ChunkContext chunkContext) throws Exception {  
        if(shouldStop()) {  
            chunkContext.getStepContext().getStepExecution().setTerminateOnly();  
        }  
        processItem();  
        if(moreItemsToProcess()) {  
            return RepeatStatus.CONTINUABLE;  
        } else {  
            return RepeatStatus.FINISHED;  
        }  
        }  
        (...)  
}  

b、面向“块”的step中,ItemReader, ItemProcessor, 和 ItemWriter 这3个接口中,它们中间没有 StepExecution。此时我们需要用到Listener中的StepExecution。

示例:

代码语言:javascript复制
public class StopListener {  
    private StepExecution stepExecution;  
    @BeforeStep  
    public void beforeStep(StepExecution stepExecution) {  
        this.stepExecution = stepExecution;  
    }  
      
    @AfterRead  
    public void afterRead() {  
        if(stopConditionsMet()) {  
            stepExecution.setTerminateOnly();  
        }  
    }  
    (...)  
}  

从业务逻辑的需要出发,停止job的最佳的方式,还是设置stepExecution.setTerminateOnly();这个job停止标识,来让job停止运行。

放弃一个job

一个job的执行过程当执行到FAILED状态之后,如果它是可重启的,它将会被重启。

如果任务的执行过程状态是ABANDONED,那么框架就不会重启它。ABANDONED状态也适用于执行步骤,使得它们可以被跳过,即便是在一个可重启的任务执行之中:如果任务执行过程中碰到在上一次执行失败后标记为ABANDONED的步骤,将会跳过该步骤直接到下一步(这是由任务流定义和执行步骤的退出码决定的)。

如果当前的系统进程死掉了(“kill -9”或系统错误),job自然也不会运行,但JobRepository是无法侦测到这个错误的,因为进程死掉之前没有对它进行任何通知。你必须手动的告诉它,你知道任务已经失败了还是说考虑放弃这个任务(设置它的状态为FAILED或ABANDONED)-这是业务逻辑层的事情,无法做到自动决策。

只有在不可重启的任务中才需要设置为FAILED状态,或者你知道重启后数据还是有效的。Spring Batch Admin中有一系列工具JobService,用以取消正在进行执行的任务。

失败一个job

失败的job是可以重新启动的,因为它的状态是FAILED,如果step2失败,则返回一个EARLY TERMINATION的返回码,step3也就不会执行。否则继续执行step3

代码语言:javascript复制
<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

结束一个job

已经结束的job是不能重新启动的,因为它的状态是COMPLETED。如果step2失败了,则step3就不执行了,该job也就COMPLETED,结束了。如果step2成功了,则继续往下执行step3。

代码语言:javascript复制
<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

JobOperator

一个JobLauncher使用一个JobRepository创建并运行新的JobExection对象,Job和Step实现随后使用相同的JobRepository在job运行期间去更新相同的JobExecution对象。

这些基本的操作能够满足简单场景的需要,但是对于有着数百个任务和复杂定时流程的大型批处理情况来说,就需要使用更高级的方式访问元数据:

JobRepository 提供了对元数据的 CRUD 操作,JobExplorer 提供了对元数据的只读操作。然而,这些操作最常用于联合使用诸多的批量操作类,来对任务进行监测,并完成相当多的任务控制功能,比如停止、重启或对任务进行汇总。在Spring Batch 中JobOperator 接口提供了这些操作类型:

代码语言:javascript复制
public interface JobOperator {
      List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
      List<Long> getJobInstances(String jobName, int start, int count)throws NoSuchJobException;
      Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
      String getParameters(long executionId) throws NoSuchJobExecutionException;
      Long start(String jobName, String parameters)throws NoSuchJobException, JobInstanceAlreadyExistsException;
      Long restart(long executionId)throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                                 NoSuchJobException, JobRestartException;
      Long startNextInstance(String jobName)throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
      boolean stop(long executionId)throws NoSuchJobExecutionException, JobExecutionNotRunningException;
      String getSummary(long executionId) throws NoSuchJobExecutionException;Map<Long, String> getStepExecutionSummaries(long executionId)
                                 throws NoSuchJobExecutionException;
      Set<String> getJobNames();
}

JobOperator 最常见的作用莫过于停止某个Job:

代码语言:javascript复制
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即发生的,因为没有办法将一个任务立刻强制停掉,尤其是当任务进行到开发人员自己的代码段时,框架在此刻是无能为力的,比如某个业务逻辑处理。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。 此处我们通过web的API接口去调用 jobOperator,通过接口传入job的参数。调用的Job 是根据 在创建job时候,Bean name去指定。

示例:

代码语言:javascript复制
@Controller
public class JobOperatorController {
	@Autowired
    private JobOperatorDemo jobOperatorDemo;

	//不需要注入Job对象
    //@Autowired
    //JobLauncherDemoJob jobLauncherDemoJob;
		
    @RequestMapping("/job/{msg}")
    public void handle(@PathVariable String msg) throws Exception{
			// 把接收到的参数传给任务
			System.out.println("Request to run job2 with param: "   job2param);
 
        jobOperatorDemo.start("jobOperatorDemoJob","msg=" msg);
 
        return "JobOperator success.";
    }
}

``` public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Map<String,JobParameters> parameters;

@Autowired
private JobRepository jobRepository;

@Autowired
private JobExplorer jobExplorer;

@Autowired
private JobRegistry jobRegistry;

@Autowired
private JobLauncher jobLauncher;
	
	pricate ApplicationContext context;
	
	 
@Bean
public JobRepositoryBeanPostProcessor jobRegistrar() throws Exception{
	JobRepositoryBeanPostProcessor postProcessor = new JobRepositoryBeanPostProcessor;
	
	postProcessor.setJobRegistry(jobRegistry);
	postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
	postProcessor.afterPropertiesSet();
	
	return postProcessor;
}
	
@Bean
public JobOperator jobOperator(){
    SimpleJobOperator operator = new SimpleJobOperator();

    operator.setJobLauncher(jobLauncher);
			// 参数转换
    operator.setJobParametersConverter(new DefaultJobParametersConverter());
			// 持久化方式
    operator.setJobRepository(jobRepository);
			// 任务相关信息
    operator.setJobExplorer(jobExplorer);
			// 注册:任务名字符串和任务对象关联
    operator.setJobRegistry(jobRegistry);
    return operator;
}

@Bean
public Job jobOperatorDemo() {
    return jobBuilderFactory.get("jobOperatorDemo")
			.listener(this)
            .start(jobOperatorDemoStep())
            .build();
}
	
@Bean
public Step jobOperatorDemoStep() {
    return stepBuilderFactory.get("jobOperatorDemoStep")
            .tasklet(new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                    System.out.println("jobOperatorDemoStep,msg: " parameter.get("msg").getValue());
                    return RepeatStatus.FINISHED;
                }
            }).build();
}
  
@Override
public void beforeStep(StepExecution stepExcution){
	parameter = stepExcution.getJobParameters().getParameters();
}



@Override
public void setApplicationContext(ApplicationContext applicationContext){
	this.context = applicationContext;
}
代码语言:javascript复制
<br/><br/>


<br/><br/>
 
 
 参考:
 
 https://blog.csdn.net/github_36849773/article/details/66968461
 
 https://www.iteye.com/blog/kanpiaoxue-1771208
 
 https://www.cnblogs.com/nizuimeiabc1/p/9409492.html
 
 https://blog.csdn.net/kangkanglou/article/details/82627799
 
 https://blog.csdn.net/wuzhiwei549/article/details/85394406

0 人点赞