Spring Batch 详解

2022-08-24 14:02:33 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

Job Launcher 和Job Repository

对应着的Java接口分别是:JobLauncher和 JobRepository

JobLauncher.

代码语言:javascript复制
packageorg.springframework.batch.core.launch;
(...)
publicinterface JobLauncher {
publicJobExecution run(Job job, JobParameters jobParameters)
throwsJobExecutionAlreadyRunningException,
JobRestartException,JobInstanceAlreadyCompleteException,
JobParametersInvalidException;
}

JobLauncher接口接受两个参数:Job和JobParameters

调用这个Job Launcher方法:可以通过java程序来通过JobLauncher来启动,也可以通过定时任务例如Quartz scheduler来启动.

JobRepository

JobRepository保持着所有Job执行的相关元数据,JobRepository的java接口如下:

代码语言:javascript复制
packageorg.springframework.batch.core.repository;
(...)
publicinterface JobRepository {
booleanisJobInstanceExists(String jobName, JobParametersjobParameters);
JobExecutioncreateJobExecution(
StringjobName, JobParameters jobParameters)
throwsJobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException;
voidupdate(JobExecution jobExecution);
voidadd(StepExecution stepExecution);
voidupdate(StepExecution stepExecution);
voidupdateExecutionContext(StepExecution stepExecution);
voidupdateExecutionContext(JobExecution jobExecution);
StepExecutiongetLastStepExecution(JobInstance jobInstance,
StringstepName);
intgetStepExecutionCount(JobInstance jobInstance, StringstepName);
JobExecutiongetLastJobExecution(String jobName,
JobParametersjobParameters);
}

JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。

JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了,并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里:

BATCH_JOB_INSTANCE BATCH_JOB_EXECUTION_PARAMS BATCH_JOB_EXECUTION BATCH_STEP_EXECUTION BATCH_JOB_EXECUTION_CONTEXT BATCH_STEP_EXECUTION_CONTEXT

Spring Batch框架的JobRepository支持主流的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。可爱的是,我司的Gauss数据库也是支持的,只不过需要稍加配置。

每个表的解析:http://blog.csdn.net/u011659172/article/details/50749534

一个job的启动通常是一个事件来进行触发的,我们会经常用到JobLauncher接口和JobParameters类,但是这个事件是可以来自任何地方的,例如一个系统的定时器;一个http请求到web控制器来启动job等这就是Spring Batch 和外界的介入点.

另外批量Job程序基本上都是用来处理各种数据的,所以上图中我们可以看到SpringBatch的出口都是在跟数据源进行打交道,这些数据源可以是任何类型,但是文件系统和数据库是最常用的,当然它也可以支持写消息给JMS.

JobExecution

代码语言:javascript复制
importorg.springframework.batch.core.JobExecution;
 
                                             jobParameters= new JobParametersBuilder()
                                                .addLong("time",System.currentTimeMillis())
                                                .addString("filePath",smsFilePath)
                                                .toJobParameters();
                                                JobExecutionjobExecution =jobLauncher.run(sms2DBJob,jobParameters);
                                                 if(jobExecution.getStatus() ==BatchStatus.COMPLETED) {
                                                                        returntrue;
                                                }

Job Execution表示Job执行的句柄,一次Job执行可能成功也可能失败。只有Job Execution执行成功之后,对应的Job Instance才能被完成。

Job Execution对应的数据库表是:BATCH_JOB_EXECUTION,

代码语言:javascript复制
CREATE TABLEBATCH_JOB_EXECUTION  (
            JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
            VERSION BIGINT  ,
            JOB_INSTANCE_ID BIGINT NOT NULL,
            CREATE_TIME DATETIME NOT NULL,
            START_TIME DATETIME DEFAULT NULL ,
            END_TIME DATETIME DEFAULT NULL ,
            STATUS VARCHAR(10) ,
            EXIT_CODE VARCHAR(2500) ,
            EXIT_MESSAGE VARCHAR(2500) ,
            LAST_UPDATED DATETIME,
            JOB_CONFIGURATION_LOCATIONVARCHAR(2500) NULL,
            constraint JOB_INST_EXEC_FK foreignkey (JOB_INSTANCE_ID)
            referencesBATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
)ENGINE=InnoDB;

对应的java类是:org.springframework.batch.core.JobExecution。

status :BatchStatus对象表示执行状态。BatchStatus.STARTED表示运行时,BatchStatus.FAILED表示执行失败,BatchStatus. COMPLETED表示任务成功结束

JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,

startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法

可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例

,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer

代码语言:javascript复制
                      @Bean
                publicJob product2DBJob() {
                logger.info("product2DBJob");
               return productJobBuilderFactory.get("product2DBJob") //create ajobbuilder instance
                                      .incrementer(newRunIdIncrementer())
                                      .repository(jobRepository)                                                                                                       .listener(productJobCompletionNotificationListener())
                                       .flow(productStep())
                                       .end()
                                       .build();
                                 ------------------------------------------
                                  public Job build() {
                                FlowJobjob = new FlowJob();
                                job.setName(getName());
                                job.setFlow(flow);
                                super.enhance(job);
                                try{
                                                job.afterPropertiesSet();
                                }
                                catch(Exception e) {
                                                thrownew StepBuilderException(e);
                                }
                                returnjob;
                }
                                 
                                                                               
                }

@StepScope

主要用于 Spring Batch。 它本质上是一个lazy scope,告诉Spring在首次访问时才创建bean。 在本例中, 我们需要使用

step scope 是因为使用了job参数的 ” filePath“值,这个值在应用程序启动时是不存在的。 使用 stepscope 使Spring

Batch在创建这个bean时能够找到“filePath“值。

弹性:

代码语言:javascript复制
@Bean
          
           public Step userStep(){
              System.out.println("userStep!!!!!");
           return    stepBuilderFactory.get("userStep")
              .<User,Message>chunk(2)
              .reader(userReader(filePath))
              .processor(MessageProcessor())
              .writer(MessageWriter())
//                .faultTolerant() 
//                .retry(Exception.class)  不能跳过的错误
//               .noRetry(ParseException.class) 
//                .retryLimit(5)  最多可以让一条数据重试5次
//                .listener(newRetryFailuireItemListener())   
//                .skip(Exception.class)
//                .skipLimit(10)        
//                .taskExecutor(newSimpleAsyncTaskExecutor())
//                .throttleLimit(10)       
//               .transactionManager(transactionManager) 
                   .build();
           }
          
           /**skip(Exception.class)
            * skipLimit(10):跳过十次记录报错为Exception的记录,多余十次则Job失败
            * ----
            * retry(Exception.class)
            * noRetry(ParseException.class) 
            * retryLimit(5):一条记录有可能过几毫秒就可以运行成功,所以一条记录可以重试5次,但是如果一条记录包ParseException错误也就是数据本身的字段数目不对或者其他的原因则
            * 不会重试就相当于skiplimit 1,当报Exception时则重试最多五次,不成功然后跳过,这个需要和skiplimit联合使用不然job会失败
            */

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/140199.html原文链接:https://javaforall.cn

0 人点赞