Spring Batch 批处理(6) - ItemProcessor

2020-05-26 17:17:52 浏览数 (1)

ItemProcessor

在开发过程中,我们经常需要读取数据后,经过一系列业务逻辑的操作,进而写入数据到指定持久化过程。Spring Batch为我们提供了ItemProcessor接口进行数据处理。

1.ItemProcessor:spring-batch中数据处理的过程

2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等

3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,传入一个类型I,然后由Processor处理成为O

代码语言:javascript复制
public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

我们可以构建CompositeItemProcessor 的Bean, 在一个Step中可以使用多个Processor来按照顺序处理业务。 我们可以构建CompositeItemProcessor 的Bean,存储多个Processor,再与Step绑定;

代码语言:javascript复制
  @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    private DataSource dataSource;
 
    @Autowired
    private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;
 
    @Autowired
    private ItemProcessor<Customer, Customer> idFilterProcessor;
 
    @Bean
    public Job processorDemoJob() throws Exception {
        return jobBuilderFactory.get("processorDemoJob")
                .start(processorDemoStep())
                .build();
 
    }
 
    @Bean
    public Step processorDemoStep() throws Exception {
        return stepBuilderFactory.get("processorDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .processor(processorDemoProcessor())
                .writer(flatFileDemoFlatFileWriter())
                .build();
    }
 
    @Bean
    public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
        CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
			  // 多种处理方式
        List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
        list.add(firstNameUpperCaseProcessor);
        list.add(idFilterProcessor);
        processor.setDelegates(list);
        
        return processor;
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
 
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100);
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();
 
        });
 
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);
 
        reader.setQueryProvider(queryProvider);
 
        return reader;
 
    }
 
    @Bean
    public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
        String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
        System.out.println(">> file is created in: "   path);
        itemWriter.setResource(new FileSystemResource(path));
 
        itemWriter.setLineAggregator(new MyCustomerLineAggregator());
        itemWriter.afterPropertiesSet();
 
        return itemWriter;
 
    }

具体ItemProcessor实现

代码语言:javascript复制
@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // FirstName转大写
        return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
                item.getBirthdate());
    }
}
 

@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // 过滤id为基数的数据
        if (item.getId() % 2 == 0){
            return item;
        } else {
            return null;
        }
    }
}

输出结果

参考:

https://blog.csdn.net/wuzhiwei549/article/details/88622281

0 人点赞