SpringBatch文档

2022-09-01 10:41:48 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

第一章 SpringBatch 入门

第一节 SpringBatch概述

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务的分区功能、远程功能

Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。一般和调度框架例如quatrz结合使用

框架主要有以下功能:

Transaction management(事务管理) Chunk based processing(基于块的处理) Declarative I/O(声明式的输入输出) Start/Stop/Restart(启动/停止/再启动) Retry/Skip(重试/跳过)

框架一共有4个主要角色:JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。Job代表着一个具体的任务。Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。

第二节 搭建SpringBatch项目

https://start.spring.io/

代码语言:javascript复制
 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>

第三节替换为MySQL数据库

代码语言:javascript复制
<dependency>	   
    <groupId>mysql</groupId>    
    <artifactId>mysql-connector-java</artifactId> 
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

application.properties

代码语言:javascript复制
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/springbatch
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always

第四节SpringBatch入门程序

代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class JobDemo{ 
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job helloWorldJob() { 
   
        return jobBuilderFactory.get("helloWorldJob").start(step1()).build();
    }

    // 需要是public的
    @Bean
    public Step step1() { 
   
        return stepBuilderFactory.get("step1").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("Hello World!");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

}

第五节 核心API

代码语言:javascript复制
JobInstance:该领域概念和Job的关系与Java中实例和类的关系一样,Job定义了一个工作流程, JobInstance就是该工作流程的一个具体实例。一个Job可以有多个JobInstance, 多个JobInstance之间的区分就要靠另外一个领域概念JobParameters了。

JobParameters:是一组可以贯穿整个Job的运行时配置参数。不同的配置将产生不同的JobInstance,如果你是使用相同的JobParameters运行同一个Job, 那么这次运行会重用上一次创建的JobInstance。另外,Spring Batch还非常贴心的提供了让JobParameters中的部分参数不参与JobInstance区分的功能。

JobExecution: 该领域概念表示JobInstance的一次运行,JobInstance运行时可能会成功或者失败。每一次JobInstance的运行都会产生一个JobExecution。同一个JobInstance(JobParameters相同)可以多次运行,这样该JobInstance将对应多个Jobexecution。JobExecution记录了一个JobInstance在一次运行时的发生的所有事情,因此,一个JobExecution需要包含很多的属性,并且需要持久化,这样才能很好的支撑Restart等Spring Batch特性。

StepExecution: 类似于JobExecution,该领域对象表示Step的一次运行。Step是Job的一部分,因此一个StepExecution会关联到一个Jobexecution。另外,该对象还会存储很多与该次Ste运行相关的所有数据,因此该对象也有很多的属性,并且需要持久化以支持一些Spring Batch的特性。

ExecutionContext: 从前面的JobExecution,StepExecution的属性介绍中已经提到了该领域概念。说穿了,该领域概念就是一个容器,该容器由Batch框架控制,框架会对该容器持久化,开发人员可以使用该容器保存一些数据,以支持在整个BatchJob或者整个Step中共享这些数据

第二章 作业流

job step flow 等申明时不能重名 我是一步一步写的demo没写一个,上一个就要注释掉

第一节 Job的创建和使用

Job:作业。批处理中的核心概念,是Batch操作的基础单元。 每个作业Job有1个或者多个作业步Step;

代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class JobFlowDemo1 { 
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job JobFlowDemo1() { 
   
        /** * TODO on("COMPLETED") 代表step1成功才去执行step2 * next 是直接执行 * stopAndRestart() 停止重新启动 */
        return jobBuilderFactory.get("jobFlowDemo1")
            // .start(step1())
            // .next(step2())
            // .next(step3())
            // .build();
            .start(step1()).on("COMPLETED").to(step2()).from(step2()).on("COMPLETED").to(step3())// fail,stopAndRestart()
            .from(step3()).end().build();
    }

    @Bean
    public Step step1() { 
   
        return stepBuilderFactory.get("step1").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("==================================step1");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step step2() { 
   
        return stepBuilderFactory.get("step2").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("==================================step2");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step step3() { 
   
        return stepBuilderFactory.get("step3").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("==================================step3");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

}

第二节 Flow的创建和使用

1.Flow是多个Step的集合 2.可以被多个Job复用 3.使用FlowBuilder来创建

代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class JobFlowDemo2 { 
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step jobFlowStep1() { 
   
        return stepBuilderFactory.get("jobFlowStep1").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("jobFlowStep1");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step jobFlowStep2() { 
   
        return stepBuilderFactory.get("jobFlowStep2").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("jobFlowStep2");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step jobFlowStep3() { 
   
        return stepBuilderFactory.get("jobFlowStep3").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("jobFlowStep3");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    //TODO 创建 Flow:Flow是step的集合 步骤相同的被多次引用,可以使用flow,实现复用
    @Bean
    public Flow jobFlowDemo2Flow() { 
   
        return new FlowBuilder<Flow>("jobFlowDemo2Flow")
                .start(jobFlowStep1())
                .next(jobFlowStep2())
                .build();
    }

    // 创建Job
    @Bean
    public Job jobFlowDemo2Job() { 
   
        return jobBuilderFactory.get("jobFlowDemo2Job")
                .start(jobFlowDemo2Flow())
                //TODO 继续使用step
                .next(jobFlowStep3())
                .end()
                .build();
    }

}

第三节 split实现并发执行

实现任务中的多个step或多个flow并发执行 1:创建若干个step 2:创建两个flow 3:创建一个任务包含以上两个flow,并让这两个flow并发执行

代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class JobSplitDemo3 { 
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step jobSplitStep1() { 
   
        return stepBuilderFactory.get("jobSplitStep1").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception { 
   
                System.out
                    .println(chunkContext.getStepContext().getStepName()   ","   Thread.currentThread().getName());
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step jobSplitStep2() { 
   
        return stepBuilderFactory.get("jobSplitStep2").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception { 
   
                System.out
                    .println(chunkContext.getStepContext().getStepName()   ","   Thread.currentThread().getName());
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step jobSplitStep3() { 
   
        return stepBuilderFactory.get("jobSplitStep3").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception { 
   
                System.out
                    .println(chunkContext.getStepContext().getStepName()   ","   Thread.currentThread().getName());
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Flow jobSplitFLow1() { 
   
        return new FlowBuilder<Flow>("jobSplitFLow1").start(jobSplitStep1()).build();
    }

    @Bean
    public Flow jobSplitFLow2() { 
   
        return new FlowBuilder<Flow>("jobSplitFLow2")
                .start(jobSplitStep2())
                .next(jobSplitStep3())
                .build();
    }

    @Bean
    Job jobSplitJob() { 
   
        return jobBuilderFactory.get("jobSplitJob")
                .start(jobSplitFLow1())
                //TODO 利用split完成 jobSplitFLow1 和 jobSplitFLow2 并发(同时)执行
                .split(new SimpleAsyncTaskExecutor())
                .add(jobSplitFLow2())
                .end()
                .build();// 让两个flow分别在各自的线程中异步执行
    }
}

第四节 决策器的使用

接口:JobExecutionDecider

代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

/** * @author hongtao.hao * @date 2019/7/3 */
public class MyDecider implements JobExecutionDecider { 
   

    private int count;

    @Override
    public FlowExecutionStatus decide(JobExecution arg0, StepExecution arg1) { 
   
        count  ;
        if (count % 2 == 0)
            return new FlowExecutionStatus("even");
        else
            return new FlowExecutionStatus("odd");
    }

}
代码语言:javascript复制
package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class DecisionDemo4 { 
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() { 
   
        return stepBuilderFactory.get("step1").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("step1");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step step2() { 
   
        return stepBuilderFactory.get("step2").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("step2");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Step step3() { 
   
        return stepBuilderFactory.get("step3").tasklet(new Tasklet() { 
   
            @Override
            public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception { 
   
                System.out.println("step3");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    //TODO 创建决策器
    @Bean
    public JobExecutionDecider myDecider() { 
   
        return new MyDecider();
    }

    // 创建Job
    @Bean
    public Job DecisionDemoJob() { 
   
        return jobBuilderFactory.get("DecisionDemoJob")
                .start(step1())
                //next一个决策器
                .next(myDecider())
                //TODO 通过返回值决定走哪一个step
                .from(myDecider()).on("even").to(step3())
                .from(myDecider()).on("odd").to(step2())
                //TODO 否则接着走决策器
                .from(step2()).on("*").to(myDecider())
                .end().build();
    }
}

第五节 Job的嵌套

一个Job可以嵌套在另一个Job中,被嵌套的Job称为子Job,外部Job称为父Job。 子Job不能单独执行,需要由父Job来启动

案例:创建两个Job,作为子Job,再创建一个Job作为父Job

@Configuration @EnableBatchProcessing class NestedJobDemo5 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private Job childJobOne;

@Autowired
private Job childJobTwo;

@Autowired
private JobLauncher launcher;

@Bean
public Job parentJob(JobRepository repository,PlatformTransactionManager transactionManager)
{
	return jobBuilderFactory.get("parentJob")
			.start(childJob1(repository,transactionManager))
			.next(childJob2(repository,transactionManager))
			.build();
}

//返回的是Job类型的Step
private Step childJob2(JobRepository repository,PlatformTransactionManager transactionManager) {
	return new JobStepBuilder(new StepBuilder("childJobTwo"))
			   .job(childJobTwo).launcher(launcher)
			   .repository(repository)
			   .transactionManager(transactionManager)
			   .build();
}

private Step childJob1(JobRepository repository,PlatformTransactionManager transactionManager) {
	return new JobStepBuilder(new StepBuilder("childJobOne"))
			   .job(childJobOne).launcher(launcher)
			   .repository(repository)
			   .transactionManager(transactionManager)
			   .build();
}

}

spring.batch.job.names=parentJob

第六节监听器的使用

用来监听批处理作业的执行情况

创建监听可以通过实现接口或使用注解

JobExecutionListener(before,after)

StepExecutionListener(before,after)

ChunkListener(before,after,error)

ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)

public class MyJobListener implements JobExecutionListener{

代码语言:javascript复制
@Override
public void beforeJob(JobExecution jobExecution) {
	// TODO Auto-generated method stub
	System.out.println(jobExecution.getJobInstance().getJobName() "before...");
}

@Override
public void afterJob(JobExecution jobExecution) {
	// TODO Auto-generated method stub
	System.out.println(jobExecution.getJobInstance().getJobName() "after...");
}

}

public class MyChunkListener {

代码语言:javascript复制
@BeforeChunk
public void beforeChunk(ChunkContext context){
	System.out.println(context.getStepContext().getStepName() "before...");
}

@AfterChunk
public void afterChunk(ChunkContext context){
	System.out.println(context.getStepContext().getStepName() "after...");
}

}

第七节Job参数

@Configuration @EnableBatchProcessing public class JobParametersDemo7 implements StepExecutionListener{

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;


private Map<String, JobParameter> parameters;
 

@Bean
public Job parametersJob()
{
	return jobBuilderFactory.get("parametersJob")
			.start(parameterStep())
			.build();
}

//在step执行之前接收到参数,因为会在step中用到,所以使用监听
@Bean
public Step parameterStep() {
	return stepBuilderFactory.get("parameterStep")
			.listener(this)
			.tasklet(new Tasklet() {
				@Override
				public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
					System.out.println(parameters.get("name"));
					return RepeatStatus.FINISHED;
				}
			}).build();
}


@Override
public void beforeStep(StepExecution stepExecution) {
	parameters = stepExecution.getJobParameters().getParameters();
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
	return null;
}

}

第三章 数据输入

第一节 ItemReader概述

@Configuration

@EnableBatchProcessing

public class ItemReaderDemo8 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactroy;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job itemReaderJob()
{
	return jobBuilderFactroy.get("itemReaderJob")
		   .start(itemReaderStep())
		   .build();
}

@Bean
public Step itemReaderStep() {
	return stepBuilderFactory.get("itemReaderStep")
		   .<String,String>chunk(2)
		   .reader(itemReaderDemoRead())
		   .writer(list->{
		         for(String item:list)
		         {
		        	 System.out.println(item "...");
		         }
		   }).build();
		   
}

@Bean
public MyReader itemReaderDemoRead() {
	List<String> data=Arrays.asList("cat","dog","pig","duck");
	return new MyReader(data);
}

}

第二节 从数据库中读取数据

JdbcPagingItemReader

@Configuration

@EnableBatchProcessing

class DbJdbcDemo9 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("dbJdbcWriter")
private ItemWriter<? super User> dbJdbcWriter;

@Autowired
private DataSource dataSource;

@Bean
public Job dbJdbcJob()
{
	return jobBuilderFactory.get("dbJdbcJob")
		  .start(dbJdbcStep())
		  .build();
}

@Bean
public Step dbJdbcStep() {
	return stepBuilderFactory.get("dbJdbcStep")
		   .<User,User>chunk(2)
	       .reader(dbJdbcReader())
	       .writer(dbJdbcWriter)
	       .build();
}

@Bean
@StepScope
public JdbcPagingItemReader<User> dbJdbcReader() {
	
	JdbcPagingItemReader<User> reader=new JdbcPagingItemReader<>();
	//使用JdbcPagingItemReader对象从数据库中读取数据
	reader.setDataSource(dataSource);
	reader.setFetchSize(2);
	reader.setRowMapper(new RowMapper<User>() {
		@Override
		public User mapRow(ResultSet rs, int rowNum) throws SQLException {
			User user=new User();
			user.setId(rs.getInt(1));
			user.setUsername(rs.getString(2));
			user.setPassword(rs.getString(3));
			user.setAge(rs.getInt(4));
			return user;
		}
	});
	//指定sql语句
	MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider();
	provider.setSelectClause("id,username,password,age");
	provider.setFromClause("from User");
	
	Map<String,Order> sort=new HashMap<>(1);
	sort.put("id", Order.ASCENDING);
	provider.setSortKeys(sort);
	
	reader.setQueryProvider(provider);
	return reader;
}

}

第三节 从普通文件中读取数据

FlatFileItemReader

@Configuration

@EnableBatchProcessing

class FileDemo10 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("fileWriter")
private ItemWriter<? super Customer> fileWriter;


@Bean
public Job fileJob()
{
	return jobBuilderFactory.get("fileJob")
		  .start(fileStep())
		  .build();
}

@Bean
public Step fileStep() {
	return stepBuilderFactory.get("fileStep")
		   .<Customer,Customer>chunk(100)
	       .reader(fileReader())
	       .writer(fileWriter)
	       .build();
}

@Bean
@StepScope
public FlatFileItemReader<Customer> fileReader() {
	FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
	reader.setResource(new ClassPathResource("customer.csv"));
	reader.setLinesToSkip(1);//跳过第一行
	
	//如何解析数据
	DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
	//制定四个表头字段
	tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"}); 
	//把一行映射为Customer对象
	DefaultLineMapper<Customer> mapper=new DefaultLineMapper<>();
	mapper.setLineTokenizer(tokenizer);
	mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
		@Override
		public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
			Customer customer =new Customer();
			customer.setId(fieldSet.readLong("id"));
			customer.setFirstName(fieldSet.readString("firstName"));
			customer.setLastName(fieldSet.readString("lastName"));
			customer.setBirthday(fieldSet.readString("birthday"));
			return customer;
		}
	});
	
	mapper.afterPropertiesSet();
	reader.setLineMapper(mapper);
	
	return reader;
}

第四节 从XML文件中读取数据

StaxEventItemReader

org.springframework spring-oxm com.thoughtworks.xstream xstream 1.4.7

@Configuration

@EnableBatchProcessing

public class XmlDemo11 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("xmlWriter")
private ItemWriter<? super Customer> xmlWriter;



@Bean
public Job xmlJob()
{
	return jobBuilderFactory.get("xmlJob")
		  .start(xmlStep())
		  .build();
}

@Bean
public Step xmlStep() {
	return stepBuilderFactory.get("xmlStep")
		   .<Customer,Customer>chunk(20)
	       .reader(xmlReader())
	       .writer(xmlWriter)
	       .build();
}


@Bean
@StepScope
public StaxEventItemReader<Customer> xmlReader() {
	StaxEventItemReader<Customer> reader = new StaxEventItemReader<>();
	reader.setResource(new ClassPathResource("customer.xml"));
	//指定需要处理的根标签
	reader.setFragmentRootElementName("customer");
	//加入oxm的依赖,把xml转成对象
    XStreamMarshaller unmarshaller=new XStreamMarshaller();
    Map<String,Class> map = new HashMap<>();
    map.put("customer", Customer.class);
    unmarshaller.setAliases(map);
	reader.setUnmarshaller(unmarshaller);
	return reader;
}

第五节 从多个文件中读取数据

MultiResourceItemReader

@Configuration

@EnableBatchProcessing

class MultiFileDemo12 {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("multiFileWriter")
private ItemWriter<? super Customer> multiFileWriter;

@Value("classpath:/file*.csv")
private Resource[] fileResources;


@Bean
public Job multiFileJob()
{
	return jobBuilderFactory.get("multiFileJob")
		  .start(multiFileStep())
		  .build();
}

@Bean
public Step multiFileStep() {
	return stepBuilderFactory.get("multiFileStep")
		   .<Customer,Customer>chunk(100)
	       .reader(multiFileReader())
	       .writer(multiFileWriter)
	       .build();
}

@Bean
@StepScope
public MultiResourceItemReader<Customer> multiFileReader() {
	MultiResourceItemReader<Customer> reader =new MultiResourceItemReader<Customer>();
	
	reader.setDelegate(fileReader());
	reader.setResources(fileResources);
	return reader;
}

@Bean
@StepScope
public FlatFileItemReader<Customer> fileReader() {
	FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
	reader.setResource(new ClassPathResource("customer.csv"));
	reader.setLinesToSkip(1);//跳过第一行
	
	//如何解析数据
	DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
	//制定四个表头字段
	tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"}); 
	//把一行映射为Customer对象
	DefaultLineMapper<Customer> mapper=new DefaultLineMapper<>();
	mapper.setLineTokenizer(tokenizer);
	mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
		@Override
		public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
			Customer customer =new Customer();
			customer.setId(fieldSet.readLong("id"));
			customer.setFirstName(fieldSet.readString("firstName"));
			customer.setLastName(fieldSet.readString("lastName"));
			customer.setBirthday(fieldSet.readString("birthday"));
			return customer;
		}
	});
	
	mapper.afterPropertiesSet();
	reader.setLineMapper(mapper);
	
	return reader;
}

第六节 ItemReader异常处理及重启

@Component(“restartReader”)

public class RestartReader implements ItemStreamReader{

代码语言:javascript复制
private FlatFileItemReader<Customer> customerFlatFileItemReader=new FlatFileItemReader<>();
private Long curLine = 0L;
private boolean restart = false;
private ExecutionContext executionContext;

public RestartReader()
{
	customerFlatFileItemReader.setResource(new ClassPathResource("restartDemo.csv"));
	customerFlatFileItemReader.setLinesToSkip(1);//跳过第一行
	
	//如何解析数据
	DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
	//制定四个表头字段
	tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"}); 
	//把一行映射为Customer对象
	DefaultLineMapper<Customer> mapper=new DefaultLineMapper<>();
	mapper.setLineTokenizer(tokenizer);
	mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
		@Override
		public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
			Customer customer =new Customer();
			customer.setId(fieldSet.readLong("id"));
			customer.setFirstName(fieldSet.readString("firstName"));
			customer.setLastName(fieldSet.readString("lastName"));
			customer.setBirthday(fieldSet.readString("birthday"));
			return customer;
		}
	});
	
	mapper.afterPropertiesSet();
	customerFlatFileItemReader.setLineMapper(mapper);
	
}


@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
        NonTransientResourceException {

    Customer customer = null;

    this.curLine  ;

   if(restart){
        customerFlatFileItemReader.setLinesToSkip(this.curLine.intValue()-1);
        restart = false;
        System.out.println("Start reading from line: "   this.curLine);
    }

    customerFlatFileItemReader.open(this.executionContext);
    customer = customerFlatFileItemReader.read();


    if (customer!=null && customer.getFirstName().equals("WrongName")) {
        throw new RuntimeException("Something wrong. Customer id: "   customer.getId());
    }
    return customer;
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    this.executionContext = executionContext;
    if (executionContext.containsKey("curLine")){
        this.curLine = executionContext.getLong("curLine");
        this.restart = true;
    }else{
        this.curLine = 0L;
        executionContext.put("curLine",this.curLine);
        System.out.println("Start reading from line: "   this.curLine   1);
    }

}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    executionContext.put("curLine",this.curLine);
    System.out.println("currentLine:" this.curLine);
}

@Override
public void close() throws ItemStreamException {

}

}

第四章 数据输出

第一节 ItemWriter概述

ItemReader是一个数据一个数据的读,而 ItemWriter是一批一批的输出

@Component(“myWrite”)

public class MyWrite implements ItemWriter{

代码语言:javascript复制
@Override
public void write(List<? extends String> items) throws Exception {
	System.out.println(items.size());
	for(String ss:items){
		System.out.println(ss);
	}
}

}

@Configuration public class ItemWriterDemo {

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("myWrite")
private ItemWriter<? super String> myWrite;

@Bean
public Job itemWriterDemoJob()
{
	return jobBuilderFactory.get("itemWriterDemoJob")
		   .start(itemWriterDemoStep())
		   .build();
}

@Bean
public Step itemWriterDemoStep() {
	
	return stepBuilderFactory.get("itemWriterDemoStep")
		   .<String,String>chunk(5)
		   .reader(myRead())
		   .writer(myWrite)
		   .build();
}

@Bean
public ItemReader<String> myRead() {
	List<String> items=new ArrayList<>();
	for(int i=1;i<=50;i  ){
		items.add("java" i);
	}
	return new ListItemReader<String>(items);
}

}

第二节 数据输出到数据库

Neo4jItemWriter

MongoItemWriter

RepositoryItemWriter

HibernateItemWriter

JdbcBatchItemWriter

JpaItemWriter

GemfireItemWriter

@Configuration

public class ItemWriterDbConfig {

代码语言:javascript复制
@Autowired
private DataSource dataSource;

@Bean
public JdbcBatchItemWriter<Customer> itemWriterDb()
{
	JdbcBatchItemWriter<Customer> writer=new JdbcBatchItemWriter<Customer>();
	writer.setDataSource(dataSource);
	writer.setSql("insert into customer(id,firstName,lastName,birthday) values " 
			 "(:id,:firstName,:lastName,:birthday)");
	writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    return writer;
}

}

第三节 数据输出到普通文件

FlatFileItemWriter

案例:从数据库中读取数据写入到文件

@Configuration public class DbFileWriterConfig {

代码语言:javascript复制
@Bean
public FlatFileItemWriter<Customer> dbFileWriter() throws Exception
{
	FlatFileItemWriter<Customer> writer=new FlatFileItemWriter<Customer>();
	String path = "e:\customer.txt";
	writer.setResource(new FileSystemResource(path)); 
	
	//把一个 Customer对象转成一行字符串
	writer.setLineAggregator(new LineAggregator<Customer>() {
		
		ObjectMapper mapper = new ObjectMapper();
		
		@Override
		public String aggregate(Customer item) {
			String str = null;
			try {
				str=mapper.writeValueAsString(item);
			} catch (JsonProcessingException e) {
				e.printStackTrace();
			}
			return str;
		}
	});
	writer.afterPropertiesSet();
	return writer;
}

}

第四节 数据输出到xml文件

StaxEvenItemWriter

@Configuration public class XmlFileWriterConfig {

代码语言:javascript复制
@Bean
public StaxEventItemWriter<Customer>  xmlFileWriter() throws Exception
{
	StaxEventItemWriter<Customer> writer=new StaxEventItemWriter<Customer>();
	
	XStreamMarshaller marshaller = new XStreamMarshaller();
	Map<String,Class> aliases = new HashMap<>();
	aliases.put("customer", Customer.class);
	marshaller.setAliases(aliases);
	
	writer.setRootTagName("customers");
	writer.setMarshaller(marshaller);
	
	String path="e:\cus.xml";
	writer.setResource(new FileSystemResource(path));
	writer.afterPropertiesSet();
	
	return writer;
}

}

第五节 数据输出到多个文件

CompositeItemWriter

ClassifierCompositeItemWriter

代码语言:javascript复制
@Bean

public CompositeItemWriter<Customer> multiFileItemWriter() throws Exception
{
	CompositeItemWriter<Customer> writer=new CompositeItemWriter<Customer>();
	writer.setDelegates(Arrays.asList(jsonFileWriter(),xmlFileWriter()));
	
	writer.afterPropertiesSet();
	return writer;
}


//实现分类

@Bean

public ClassifierCompositeItemWriter<Customer> multiFileItemWriter() throws Exception
{
	ClassifierCompositeItemWriter<Customer> writer=new ClassifierCompositeItemWriter<Customer>();
	
	writer.setClassifier(new Classifier<Customer, ItemWriter<? super Customer>>() {
		@Override
		public ItemWriter<? super Customer> classify(Customer customer) {
			ItemWriter<Customer> write=null;
			try {
				write=customer.getId()%2==0?jsonFileWriter():xmlFileWriter();
			} catch (Exception e) {
				e.printStackTrace();
			} 
			return write;
		}
	});
	return writer;
}

第六节 ItemProcessor的使用

ItemProcessor<I,O>用于处理业务逻辑,验证,过滤等功能

CompositeItemProcessor

案例:从数据库中读取数据,然后对数据进行处理,最后输出到普通文件

代码语言:javascript复制
@Bean

public CompositeItemProcessor<Customer,Customer> process()
{
	CompositeItemProcessor<Customer,Customer> processor=new CompositeItemProcessor<>();
	
	List<ItemProcessor<Customer,Customer>> delegates=new ArrayList<>();
	delegates.add(firstNameUpperProcessor);
	delegates.add(idFilterProcessor);
	processor.setDelegates(delegates);
	
	return processor;
}

第五章 错误处理

第一节 错误处理概述

默认情况下当任务出现异常时,SpringBatch会 结束任务,当使相同的参数重启任务时,SpringBatch会去执行未执行的剩余任务.

代码语言:javascript复制
@Bean

@StepScope

public Tasklet errorHandling() {
	return new Tasklet() {
		@Override
		public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
			Map<String, Object> stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();
			if(stepExecutionContext.containsKey("qianfeng")){
				System.out.println("The second run will success");
				return RepeatStatus.FINISHED;
			}
			else{
				System.out.println("The first run will fail");
				chunkContext.getStepContext().getStepExecutionContext().put("qianfeng", true);
				throw new RuntimeException("error ...");
			}
		}
	};
}

第二节 错误重试(Retry)

代码语言:javascript复制
@Bean

public Step retryDemoStep() {
    return stepBuilderFactory.get("retryDemoStep")
            .<String,String>chunk(10)
            .reader(reader())
            .processor(retryItemProcessor)
            .writer(retryItemWriter)
            .faultTolerant()
            .retry(CustomRetryException.class)
            .retryLimit(10)
            .build();
}

第三节 错误跳过(Skip)

代码语言:javascript复制
@Bean

public Step skipDemoStep() {
    return stepBuilderFactory.get("skipDemoStep")
            .<String,String>chunk(10)
            .reader(reader())
            .processor(skipItemProcessor)
            .writer(skipItemWriter)
            .faultTolerant()
            .skip(CustomRetryException.class)
            .skipLimit(10)
            .build();
}   

第四节 错误跳过监听器(Skip Listener)

代码语言:javascript复制
@Bean

public Step skipListenerDemoStep1() {
    return stepBuilderFactory.get("skipListenerDemoStep1")
            .<String,String>chunk(10)
            .reader(reader())
            .processor(skipItemProcessor)
            .writer(skipItemWriter)
            .faultTolerant()
            .skip(CustomRetryException.class)
            .skipLimit(10)
            .listener(mySkipListener)
            .build();
}

@Component

public class MySkipListener implements SkipListener<String, String> {

代码语言:javascript复制
@Override
public void onSkipInRead(Throwable t) {

}

@Override
public void onSkipInWrite(String item, Throwable t) {

}

@Override
public void onSkipInProcess(String item, Throwable t) {
    System.out.println(item   " occur exception:"   t);
}

}

第六章 作业调度

第一节 JobLauncher的使用

控制任务什么时候启动

org.springframework.boot spring-boot-starter-web

程序启动时不执行任务:

spring.batch.job.enabled=false

@RestController

@RequestMapping(“/job”)

public class JobLauncherController {

代码语言:javascript复制
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job jobLauncherDemoJob;

@GetMapping("/{job1}")
public String job1Run(@PathVariable String job1) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
	System.out.println(job1);
	JobParameters jobParameters=new JobParametersBuilder()
			                    .addString("job1", job1)
			                    .toJobParameters();
	jobLauncher.run(jobLauncherDemoJob, jobParameters);
	
	return "job1 success";
}

第二节 JobOperator的使用

@Configuration

public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{

代码语言:javascript复制
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;


private Map<String, JobParameter> parameter;

@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobRepository jobRepository;

@Autowired
private JobExplorer jobExplorer;

@Autowired
private JobRegistry jobRegistry;

private ApplicationContext context;


@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();

    postProcessor.setJobRegistry(jobRegistry);
    postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
    postProcessor.afterPropertiesSet();

    return postProcessor;
}


@Bean
public JobOperator jobOperator(){
    SimpleJobOperator operator = new SimpleJobOperator();

    operator.setJobLauncher(jobLauncher);
    operator.setJobParametersConverter(new DefaultJobParametersConverter());
    operator.setJobRepository(jobRepository);
    operator.setJobExplorer(jobExplorer);
    operator.setJobRegistry(jobRegistry);
    
    return operator;
}


@Bean
public Job jobOperatorDemoJob()
{
	return jobBuilderFactory.get("jobOperatorDemoJob")
		   .start(jobOperatorDemoStep())
		   .build();
}

@Bean
public Step jobOperatorDemoStep() {
	return stepBuilderFactory.get("jobOperatorDemoStep")
           .listener(this)
		   .tasklet(new Tasklet() {
			@Override
			public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
				System.out.println(parameter.get("msg").getValue());
				return RepeatStatus.FINISHED;
			}
		}).build();
}



@Override
public void beforeStep(StepExecution stepExcution) {
	parameter = stepExcution.getJobParameters().getParameters();
	
}

@Override
public ExitStatus afterStep(StepExecution arg0) {
	
	return null;
}


@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
	this.context=applicationContext;
}

}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/140118.html原文链接:https://javaforall.cn

0 人点赞