Spark离线导出Mysql数据优化之路

2022-05-24 12:13:43 浏览数 (2)

在业务离线数据分析场景下,往往需要将Mysql中的数据先导出到分布式存储中,如Hive、Iceburg。这个功能实现的方式有很多,但每种方式都会遇到一些问题(包括阿里开源的DataX)。本文就介绍下这个功能的优化之路,并最终给出一个笔者实现的终极方案。

阶段0

笔者第一次接触到这个场景是维护历史遗留的任务,这也是笔者看到的最初实现方案(后续称作方案0)。这个版本的核心逻辑可以简化为如下的shell脚本。考虑到业务上有分库分表的场景,所以认为库表名都满足一个正则表达式。这段逻辑就是遍历Mysql实例上的库表,对所有满足正则表达式的库表执行一个SQL,查出需要的数据,保存到本地文件中,然后将文件上传到HDFS。

代码语言:shell复制
#!/bin/bash

MYSQL_CMD="mysql -hxxx -Pxxx -uxxx -pxxx"
DATABASE_REG="^xdb_test_([0-9]|[1-9][0-9])$" # db_test_0-db_test_99分库
TABLE_REG="^xtb_test_[0-9]{10}$"  # 按租户分表
file="data.txt"

$MYSQL_CMD -NB -e "SHOW DATABASES" | while read DATABASE
do
    if [[ "x$DATABASE" =~ $DATABASE_REG ]] # 遍历符合库名正则表达式的数据库
    then
        $MYSQL_CMD -NB -e "SHOW TABLES FROM ${DATABASE}" | while read TABLE
        do
            if [[ "x$TABLE" =~ $TABLE_REG ]] # 遍历符合表名正则表达式的数据表
            then
                SELECT_SQL="select ...... from ${DATABASE}.${TABLE}"
                $MYSQL_CMD -NB -e "${SELECT_SQL}" >> $file
            fi
        done
    fi
done

hadoop fs -put $file hdfs://xxxxx
rm $file

这个实现看起来可以满足需求,但其实隐藏着以下几个问题:

1. 机器性能要求高:表读取是一个SQL查出所有数据,在单表数据量比较大时,需要大内存来承载这些数据;同时这些数据需要写入本地文件,若写入处理速度较慢,会导致查询执行失败(受mysql net_read_timeout参数控制)。

2. 慢查询:SQL扫描表中全部数据,通常会导致慢查询,可能会影响其他线上业务。

3. 执行效率低:在分库分表的场景下,这些库表数据的读取只能顺序执行,在库表数据量大的情况下,整个任务无法通过并发缩短执行时间。

4. 运维困难:每次新增一个数据源的同步,都要复制一份shell,然后改里面的库表信息、查询语句;要新增一些优化逻辑,需要每个脚本都改一遍;shell脚本在日常业务开发中使用不多,实现逻辑、定位问题都很不方便。

阶段1:解决查询执行失败

方案0最严重的问题就是查询执行失败。随着业务数据量的增大,由于数据无法及时写入磁盘,有些表的SQL查询必然会执行超时(net_read_timeout);同时大数据量的查询也导致脚本运行会占用大量内存。既然是读取数据量的问题,可以先加上分页读取的逻辑,让查询可以执行成功。阶段1的实现(后续称作方案1)就是在方案0的基础上,增加了单表读取分页逻辑:先查出该表的总行数,然后按照固定的PAGE_SIZE进行循环分页查询。

代码语言:shell复制
#!/bin/bash

MYSQL_CMD="mysql -hxxx -Pxxx -uxxx -pxxx"
DATABASE_REG="^xdb_test_([0-9]|[1-9][0-9])$" # db_test_0-db_test_99分库
TABLE_REG="^xtb_test_[0-9]{10}$"  # 按租户分表
file="data.txt"

$MYSQL_CMD -NB -e "SHOW DATABASES" | while read DATABASE
do
    if [[ "x$DATABASE" =~ $DATABASE_REG ]] # 遍历符合库名正则表达式的数据库
    then
        $MYSQL_CMD -NB -e "SHOW TABLES FROM ${DATABASE}" | while read TABLE
        do
            if [[ "x$TABLE" =~ $TABLE_REG ]] # 遍历符合表名正则表达式的数据表
            then
                count=`$MYSQL_CMD -NB -e "select count(1) from ${DATABASE}.${TABLE}"`
                page_size=100000 # 经验值,不同表可能不一样
                total_page=`echo "$count/$page_size" | bc`
                for page in `seq 0 $total_page`
                do
                    offset=`echo "$page*$page_size" | bc`
                    SELECT_SQL="select ...... from ${DATABASE}.${TABLE} limit $offset, $page_size"
                    $MYSQL_CMD -NB -e "${SELECT_SQL}" >> $file
                done
            fi
        done
    fi
done

hadoop fs -put $file hdfs://xxxxx
rm $file

方案1虽然解决了查询失败的问题,但这种查询方式也有明显的问题:这个查询是典型的深翻页查询,随着数据量的增大,这个查询SQL会执行的越来越慢,一方面会拖慢整个任务的执行时间,另一方面对Mysql的压力也比较大。

阶段2:解决运维问题

方案1上线之后,除了任务执行慢一些,很长一段时间并没有遇到其他问题。但随着同步的库表越来越多,每个表都要抄一份类似的代码,改库名表名,改查询语句;部署脚本的机器替换、mysql实例迁移的时候,需要重新部署所有脚本、每个脚本里改一些配置。长期下来,这就变成了一个机械重复的工作。

为了降低运维成本,我们考虑重新实现一个同步工具,把库表信息、查询语句这些逻辑信息以配置文件的方式抽象出来。这样再增加需要同步的表,就只需要指定业务字段,而不需要关心数据读取的实现。考虑到以下几个方面,决定用Spark重新实现这个工具:

1. 执行效率:Spark支持并发处理数据,可以提升任务执行速度。

2. 可扩展性:Spark SQL可以在数据导出的同时完成一些简单ETL的工作,同时也可以支持多数据源的关联处理。

3. 稳定性:Spark执行基于Yarn调度平台,提供了容错、重试等机制,也不用考虑机器裁撤、迁移的问题。

阶段3:解决慢查询问题

方案1解决了单次查询数据量大的问题,但仍然存在深翻页慢查询的问题。为此我们查了开源工具DataX[1]的实现方式,其核心实现逻辑如下:首先getPkRange方法查出数据表中主键字段的最小值和最大值,然后将主键的取值在最大值和最小值之间划分成用户指定的adviceNum个区间(整数类型区间的划分比较直接,字符串类型的划分就复杂一点,DataX是将字符串转成128进制的大整数,然后再当做整数切分),最后将区间范围转化为SQL中的where条件进行数据读取。

这种实现方式优势是:

1. 划分出的多个查询区间可以并发执行。

2. 除查询数据本身外,额外的开销几乎可以忽略不计(只需要一个查询查出主键字段的最小值和最大值)。

同时这种方式也存在问题:

1. 在SplitPK分布不均匀时,多个SQL执行的耗时可能差距很大。

2. 当SplitPK是字符串的时,区间划分的逻辑相对复杂,且对于主键是随机字符串的场景(如雪花算法生成主键),主键分布不均匀的问题会更严重。

代码语言:java复制
public static List<Configuration> splitSingleTable(Configuration configuration, int adviceNum) {
        ......
        Pair<Object, Object> minMaxPK = getPkRange(configuration);

        boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration
                .getString(Constant.PK_TYPE));
        boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration
                .getString(Constant.PK_TYPE));

        if (isStringType) {
            rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                    String.valueOf(minMaxPK.getLeft()),
                    String.valueOf(minMaxPK.getRight()), adviceNum,
                    splitPkName, "'", DATABASE_TYPE);
        } else if (isLongType) {
            rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                    new BigInteger(minMaxPK.getLeft().toString()),
                    new BigInteger(minMaxPK.getRight().toString()),
                    adviceNum, splitPkName);
        } else {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
                    "您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理.");
        }
        
        String tempQuerySql;
        if (null != rangeList && !rangeList.isEmpty()) {
            for (String range : rangeList) {
                Configuration tempConfig = configuration.clone();

                tempQuerySql = buildQuerySql(column, table, where)
                          (hasWhere ? " and " : " where ")   range;

                allQuerySql.add(tempQuerySql);
                tempConfig.set(Key.QUERY_SQL, tempQuerySql);
                pluginParams.add(tempConfig);
            }
        } 

        // deal pk is null
        Configuration tempConfig = configuration.clone();
        tempQuerySql = buildQuerySql(column, table, where)
                  (hasWhere ? " and " : " where ")
                  String.format(" %s IS NULL", splitPkName);

        tempConfig.set(Key.QUERY_SQL, tempQuerySql);
        pluginParams.add(tempConfig);
        
        return pluginParams;
    }

DataX划分区间是为了通过并发提升查询效率,因此区间划分是否均匀相对并不是很重要。而我们的目的是减少对数据表的慢查询,如果划分区间不均匀,那么不同区间的查询执行时间很可能差别很大,并且查询的执行时间会和实际数据的分布强相关,这样就很难通过参数设定控制慢查询是否产生。

于是,我们借鉴了DataX划分区间查询的思路,但是分区策略做了调整:每次查询按主键升序排序,读取N行,并记录下本次查询主键的最大值X,下次查询的查询语句中加上“> X”的条件判断。简单来讲就是每次查询记录游标,下次查询带上游标条件,这其实是一个优化深翻页的标准方法。

基于游标查询的思路实现了Spark版本数据离线导出方案(后续称作方案3),核心逻辑如下:首先通过加载配置的方式获取数据库表的信息,然后遍历所有满足正则表达式的库表,用游标查询的方式导出数据表中的完整数据。

代码语言:scala复制
val sqlDBPattern = config.sqlDBPattern //read from config
val sqlTablePattern = config.sqlTablePattern
val query = config.query // eg. select a, b, c, d
val splitPK = config.splitPK // 分区主键
val splitRowNum = config.splitRowNum // 分区读取批次大小

var resultDataFrame: DataFrame = null
for (db <- getDBList() if (checkDB(db, sqlDBPattern))) { //遍历筛选出符合正则的数据库
    for(table <- getTableList(db) if (checkTable(table, sqlTablePattern))) { //遍历筛选出符合正则的表
        var sql = query   s" from `$db`.`$table` where 1 = 1"
        if (!splitPK.isEmpty) {
            var minPK = ""
            var count = splitRowNum
            var PKType = "StringType"
            while (count >= splitRowNum) {
                var tmpSql = sql
                if (!minPK.isEmpty) tmpSql  = s" and ${splitPK} > $minPK"
                tmpSql  = s" order by ${splitPK} asc limit ${splitRowNum}"
                
                val customDF = readFromJDBC(database, tmpSql).cache
                resultDataFrame = resultDataFrame.union(customDF)

                count = customDF.count().toInt
                if (count > 0) {
                    PKType = customDF.dtypes.toMap.get(source.splitPK).get
                    minPK = PKType match { // add types check here when meet
                        case "StringType" => s"'${customDF.agg(max(splitPK)).collect()(0).getString(0)}'" // string 类型这里拼在sql里要加引号
                        case _ => s"${customDF.agg(max(splitPK)).collect()(0).getLong(0)}"
                    }
                }
            }
          }
        } else {
            resultDataFrame = resultDataFrame.union(readFromJDBC(db, query))
        }
    }
}
resultDataFrame.xxxAction // save to target table

private def readFromJDBC(database: String, sql: String): DataFrame = {
    df = sparkSession.read.format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "xxxxxx")
          .option("password", "xxxxxx")
          .option("charset", "iso-8859-1")
          .option("url", "jdbc:mysql://${ip}:${port}/${database}")
          .option("dbtable", s"($sql) t")
          .option("queryTimeout", 30)
          .option("pushDownPredicate", true)
          .option("pushDownAggregate", true)
          .load()
}

这个实现中需要注意在sparkSession.read时,设置 "pushDownPredicate"和"pushDownAggregate"这两个参数为true(默认是false)[2],这两个参数分别控制条件过滤、聚合排序是否下推到Mysql执行,若不指定,则Spark会读取数据表中的所有数据,在内存中做过滤和排序。

方案3的分页查询策略,可以保证主键分布不均匀的情况下,每次拉取的数据条数也是一致的,因此可以通过调整批量的大小,保证不会有慢查询的出现。这个策略的最大问题就是,每一次查询执行,依赖上一次查询执行的结果,这样多个分区的查询不能并发执行。由于这种依赖关系,Spark执行时每个查询都会产生一个单独的stage,都要经过driver任务调度的过程,导致程序执行会非常缓慢,并不能发挥spark并行分布式的优势。如某个业务线上分百表,一百张表加起来数据大概1.5亿行,导出任务执行需要2个小时左右。同时,每个查询中,只设置了主键的单边过滤条件,Mysql在执行时还是会扫描满足条件的所有行,在执行上也没有达到最优的效果。

阶段4:任务并发执行

如何既保证查询批次的均匀,又能让不同区间的查询并发执行呢?既然只查询最小值和最大值无法保证均匀的划分数据,那把所有主键都先读取出来,在内存中划分区间是否可行呢?只查主键通常会命中覆盖索引,查询效率会比较高,数据量也不会很大。但考虑到一个SQL读取表的所有主键还是太暴力了,而且也有可能会出现慢查询,因此查询主键这一步选择采用游标分页查询的方式。实现逻辑如下(后续称作方案4):先通过游标的方式循环拉取主键,然后按照配置中的splitRowNum划分区间。

代码语言:text复制
private def getPrimaryKeyPoints(statement: Statement, database: String, table: String): (Seq[(Any, Any)], Int) = {
    statement.executeQuery(s"use $database")
    val resultList = new scala.collection.mutable.ArrayBuffer[Any]
    var resultSize = FETCH_PRIMARY_KEY_BATCH_SIZE
    var minID: Any = null
    var pkType = 0
    while (resultSize >= FETCH_PRIMARY_KEY_BATCH_SIZE) {
      var sql = s"select ${source.splitPK} from `$database`.`$table`"
      if (minID != null) sql = sql   s" where ${source.splitPK} > ${Utils.formatPredicateValue(minID, pkType)}"
      sql = sql   s" order by ${source.splitPK} asc limit $FETCH_PRIMARY_KEY_BATCH_SIZE"

      LOGGER.info("Get primary key query: "   sql)
      val rs = statement.executeQuery(sql)
      pkType = rs.getMetaData.getColumnType(1)
      resultSize = 0
      while (rs.next()) {
        resultSize  = 1
        pkType match {
          case java.sql.Types.VARCHAR => resultList.append(rs.getString(source.splitPK))
          case _ => resultList.append(rs.getLong(source.splitPK))
        }
      }
      rs.close()
      if (!resultList.isEmpty) minID = resultList.last
    }
    if (resultList.isEmpty) (Seq(), pkType)
    else (Utils.partitionArray(resultList, source.splitRowNum), pkType)
}

Spark JDBC本身提供了并发读取数据表的方式[3],可以直接把划分好的区间转换成查询条件传入JDBC接口中,Spark就为每一个区间生成一个SQL查询,并发执行。同时对于Mysql来说,这样每次查询中过滤条件同时指定了上界和下界,可以大大减少Mysql执行时扫描的行数,进一步提升执行效率。

代码语言:text复制
sparkSession.read.jdbc(formatDBUrl(database), 
                       table, 
                       Utils.formatPredicates(pkPartitions, splitPK, pkType).toArray, 
                       prop) // user, password etc.
                       
def formatPredicates(partitions: Seq[(Any, Any)], pkField: String, pkType: Int): Seq[String] = {
    partitions.init.map(partition => formatPredicate(partition, pkField, pkType, false)) : 
      formatPredicate(partitions.last, pkField, pkType, true)
}

def formatPredicate(partition: (Any, Any), pkField: String, pkType: Int, isLast: Boolean): String = {
    val secondOperator = if (isLast) "<=" else "<"
    s"$pkField >= ${formatPredicateValue(partition._1, pkType)} and $pkField $secondOperator ${formatPredicateValue(partition._2, pkType)}"
}

def formatPredicateValue(value: Any, pkType: Int): String = {
    pkType match {
      case java.sql.Types.VARCHAR => s"'${value.asInstanceOf[String]}'"
      case _ => s"${value.asInstanceOf[Long]}"
    }
}

方案4虽然引入了读取表主键并在内存中划分区间的时间开销,但后续读取数据并ETL处理的过程完全可以并发执行,整体任务执行的效率提高了很多。经过优化之后,原本2个小时执行的任务,现在只需要20-30分钟,而其中读取表主键的时间只占用1-2分钟。

总结

对于离线导出mysql数据表写入分布式存储这个场景,本文提供了一种实现方式:首先分批查出表的所有主键,按配置的批量大小划分区间;然后区间转化为SQL的分区条件传入Spark JDBC接口,构建Spark任务读取数据。这种方式有以下几点优势:

1. 用分区查询的方式,避免了Mysql的慢查询,对其他线上业务影响较小。

2. 利用Spark分布式的能力提升任务执行速度。

3. Spark SQL功能强大,可以在数据读取的同时,通过配置做一些简单的ETL操作。

参考文献

[1] https://github.com/alibaba/DataX.

[2] JDBC To Other Databases. https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html.

[3] Using predicates in Spark JDBC read method. https://stackoverflow.com/questions/48677883/using-predicates-in-spark-jdbc-read-method.

0 人点赞