Spring Batch分析(一)

2022-07-12 14:27:59 浏览数 (1)

前面两篇文章,对于SpringBatch这个批处理框架做了一个大概的学习和了解,通过前两篇文章,你可以了解到SpringBatch是什么?应用场景有哪些?怎么去写一个SpringBatch的demo?以及SpringBatch的架构设计和核心组件的简单介绍。

今天这篇文章我们会找其中一些源码来做一下分析,让你对于SpringBatch更加了解,更好的去做技术选型和场景化方案落地。

今天来分析一个之前demo中用到的类JdbcPagingItemReader<T>

1、JdbcPagingItemReader类的继承层次

可以看到,该类的顶层是ItemReader接口和ItemStream接口。

2、JdbcPagingItemReader的作用是什么呢?

  • 用于使用JDBC以分页方式读取数据库记录。
  • 它执行由PagingQueryProvider构建的SQL来检索请求的数据。
  • 使用setPageSize(int)指定大小的分页请求执行查询。
  • 需要时,将通过调用read()方法请求其他页面,并返回与当前位置相对应的对象。
  • 在重新启动时,它将使用最后一个排序键值来定位要读取的第一页。
  • 重要的是对排序键具有唯一的键约束,以确保在两次执行之间不会丢失任何数据。
  • 分页的性能取决于可用于限制返回的行数的数据库特定功能。设置相当大的页面大小并使用与页面大小匹配的提交间隔应可提供更好的性能。
  • 在两次调用open(ExecutionContext)之间,该实现都是线程安全的,但是如果在多线程客户端中使用,请记住使用saveState=false (无重启功能)

3、JdbcPagingItemReader的属性有哪些?

代码语言:javascript复制
private static final String START_AFTER_VALUE = "start.after";
public static final int VALUE_NOT_SET = -1;
  // 数据源,在从数据库的数据源读取数据的时候,你可以在不同的reader中进行
  // 设置的时候,设置不同的数据源,可以参考我之前的多数据源的demo
private DataSource dataSource;
  // 如果需要使用分页查询的话,那么你可以使用该方式来做分页查询
private PagingQueryProvider queryProvider;
  // 顾名思义,该属性是用来组装你的SQL Where的参数的
  private Map<String, Object> parameterValues;
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
  // 对于你Jdbc查询的返回结果集就是RowMapper
  private RowMapper<T> rowMapper;
  private String firstPageSql;
  private String remainingPagesSql;
  private Map<String, Object> startAfterValues;
  private Map<String, Object> previousStartAfterValues;
private int fetchSize = VALUE_NOT_SET;

关于PagingQueryProvider接口,这里需要做一下说明,SpringBatch它根据不同的数据库类型,都封装了对应的PagingQueryProvider实现类,比如MySqlPagingQueryProvider、OraclePagingQueryProvider等,具体见下图:

如果你了解过阿里巴巴开源的DataX这个开源作品,那么你会感觉它在一定程度上的设计思想和SpringBatch是类似的,都是对接不同的数据源通过Reader,写入数据源叫Writer,只是DataX做到了更细粒度可控,能插能拔,你只需要对你需要的做一些组装就可以使用起来,而SpringBatch它是提供了基本上我们常使用的一些数据源的封装。

4、JdbcPagingItemReader也实现了InitializingBean接口的afterProperies方法

代码语言:javascript复制
public void afterPropertiesSet() throws Exception {
    super.afterPropertiesSet();
    Assert.notNull(dataSource, "DataSource may not be null");
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    if (fetchSize != VALUE_NOT_SET) {
      jdbcTemplate.setFetchSize(fetchSize);
    }
    jdbcTemplate.setMaxRows(getPageSize());
    namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
    Assert.notNull(queryProvider, "QueryProvider may not be null");
    queryProvider.init(dataSource);
    this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize());
    this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize());
  }

从这里我们可以看到,SpringBatch实质还是使用JdbcTemplate来进行的SQL查询,默认的pageSize为10,然后queryProvider调用了init方法,将对应的DataSource当作参数传入。

下面是DataSource当作init参数传入以后的代码逻辑,从下面代码我们可以看出几点:

  • datasource必须指定,否则异常
  • select查询列必须是明确的,不可以使用select *的方式来做查询
  • fromClause也必须有,否则不知道从哪个表查询数据,如果不传,就会异常
  • sortKey也是必须传的,前面也说过SpringBatch必须传一个sortKey,而且这个sortKey必须可以确定数据唯一性,否则它进行批量处理的时候会遗漏数据(此处吐槽一下,如果分页查询,必须指定sortkey,对于查询来说还是有很大性能损耗的,但是如果你要用SpringBatch的批处理分页,又不想指定sortKey,那么直接会提示异常;如果你指定了唯一key来做sortKey,但是你select里面又没有sortKey出现,SpringBatch又会给你报一个列名无效,而且该异常也不会告诉你是哪个列无效,你只能根据异常堆栈去判断,我当时是在rowMapper进行mapRow的时候发现的。。。。很坑啊)
代码语言:javascript复制
public void init(DataSource dataSource) throws Exception {
    Assert.notNull(dataSource, "A DataSource is required");
    Assert.hasLength(selectClause, "selectClause must be specified");
    Assert.hasLength(fromClause, "fromClause must be specified");
    Assert.notEmpty(sortKeys, "sortKey must be specified");
    StringBuilder sql = new StringBuilder(64);
    sql.append("SELECT ").append(selectClause);
    sql.append(" FROM ").append(fromClause);
    if (whereClause != null) {
      sql.append(" WHERE ").append(whereClause);
    }
    if(groupClause != null) {
      sql.append(" GROUP BY ").append(groupClause);
    }
    List<String> namedParameters = new ArrayList<>();
    parameterCount = JdbcParameterUtils.countParameterPlaceholders(sql.toString(), namedParameters);
    if (namedParameters.size() > 0) {
      if (parameterCount != namedParameters.size()) {
        throw new InvalidDataAccessApiUsageException(
            "You can't use both named parameters and classic "?" placeholders: "   sql);
      }
      usingNamedParameters = true;
    }
  }

此处,你也可以看出来SpringBatch的PagingQueryProvider是只支持单表查询的,如果你想存在一些join类型的查询,那么它是在这种情况下不支持的。

5、SortedKey是怎么样的一个结构?

代码语言:javascript复制
public void setSortKeys(Map<String, Order> sortKeys) {
    this.sortKeys = sortKeys;
  }

可以看出来,SortedKey是一个Map对象,其中key就是你数据库表的唯一key的字段名称,value就是一个Order对象,目前该对象只有两个属性,升序或者降序,Order是一个枚举类型:

代码语言:javascript复制
public enum Order {
  ASCENDING, DESCENDING
}

其中,我没有具体分析怎么去拼接Where条件的参数,有兴趣的可以自己去分析一下。

今天主要分享了一下SpringBatch批处理的中从数据库数据源读取数据的方式PagingQueryProvider。对于开源的东西我们不说好坏,吸收他的设计思想,发现他的不足,如果有余力,可以自行研发。

如果你数据源有数据库、消息类、文件类、那么你可以选择SpringBatch,最好建议是每一个reader读取单表数据,然后在processor中进行多个结果集的处理,最后做一个目标数据源数据的insert。如果是database类型,希望你可以在SpringBatch使用Reader读取数据的时候可以提高性能,必须索引之类,不要全表扫描之类等等

当然对于数据的抽取、清洗和转换你业可以考虑其他的技术方案、比如kettle、DataX(商业版是DataWorks),还有大数据类型,当然你也需要考虑你的资源问题,比如时间、人力等等

0 人点赞