Hive优化器原理与源码解析系列—统计模块内存成本估算

2022-04-25 15:11:53 浏览数 (1)

背景

在上篇文章“Hive优化器原理与源码解析系列--统计信息选择性计算”中,讲到了基于成本优化器和基于规则优化器的区别,这里就不再赘述。基于成本优化器会根据RelSet(等价关系表达式集合,其中元素每个RelNode关系表达式又是SQL中如Select、From、Where、Group的以代数表达式的表现形式)选出综合成本最低的关系表达式,使用动态规划算法构建出成本最优执行计划。那么基于成本优化器CBO有哪些计算指标作为成本函数的输入,除了选择性Selectivity、基数Cardinality,排序信息Collation(排序字段,排序方向等)、是否分布式等物理属性收集之外,还有IO、记录数RowNums、内存Memory都计算在成本内。这些都会作为成本优化器成本函数的输入。此文主要在介绍成本函数估算指标-内存计算。

Operator的内存估算

在Hive基于成本优化器CBO中,成本函数的输入都是基于Operator操作符,如Join、Filter、Project、Aggregate、TableScan、Unoin等Operator来估算的,内存成本估算也不例外。

内存的计算公式大致如下:

内存大小=记录数 * 列数 * 平均列长度或平均列大小

*注:有些列大小估算是根据每列的数据类型进行计算的,下面源码解析会讲到的。

Hive优化器是使用Apache Calcite框架来实现的。其中RelMetadataQuery对象可理解为Hive metaData元数据统计信息的访问媒介,因为下面会用到它,知晓其是用来访问统计信息的,细节就不再展开了。RelMetadataQuery对象访问Hive元数据表有四张:

  • TABLE_PARAMS,表级参数表,收集如下信息:文件数、记录数、原始数据大小、压缩后数据大小、统计信息是否准确标志等
  • PARTITION_PARAMS,分区级参数表,收集如下信息:文件数、记录数、原始数据大小、压缩后数据大小、统计信息是否准确标志等,PART_ID作为每个分区ID
  • TAB_COL_STATS
  • PART_COL_STATS

PART_COL_STATS:基于表分区的列统计信息收集

TAB_COL_STATS:基于表的列统计信息收集

两者张表的表结构大致相同,一个基于表一个基于表的分区级别。

这里讲解一下PART_COL_STATS统计哪些元数据信息(这里基于Mysql存放Hive元数据信息),表结构如下:

代码语言:javascript复制
CREATE TABLE `PART_COL_STATS` (
  `CS_ID` bigint(20) NOT NULL comment 'ID',
  `DB_NAME` varchar(128)  NOT NULL comment '数据库名称',
  `TABLE_NAME` varchar(256)  NOT NULL comment '表名称',
  `PARTITION_NAME` varchar(767)  NOT NULL comment '分区名称',
  `COLUMN_NAME` varchar(767)  NOT NULL comment '列名称',
  `COLUMN_TYPE` varchar(128)  NOT NULL comment '列数据类型',
  `PART_ID` bigint(20) NOT NULL comment '分区ID' ,
  `LONG_LOW_VALUE` bigint(20) DEFAULT NULL comment 'long类型的最小值' ,
  `LONG_HIGH_VALUE` bigint(20) DEFAULT NULL comment 'long类型的最大值' ,
  `DOUBLE_HIGH_VALUE` double(53,4) DEFAULT NULL comment 'double类型的最大值' ,
  `DOUBLE_LOW_VALUE` double(53,4) DEFAULT NULL comment 'double类型的最小值' ,
  `BIG_DECIMAL_LOW_VALUE` varchar(4000)  DEFAULT NULL comment 'bigdecimal类型的最小值' ,
  `BIG_DECIMAL_HIGH_VALUE` varchar(4000)  DEFAULT NULL comment 'bigdecimal类型的最大值' ,
  `NUM_NULLS` bigint(20) NOT NULL comment '为null的记录数' ,
  `NUM_DISTINCTS` bigint(20) DEFAULT NULL comment 'distinct的记录数,基数' ,
  `AVG_COL_LEN` double(53,4) DEFAULT NULL comment '列长度平均值' ,
  `MAX_COL_LEN` bigint(20) DEFAULT NULL comment '列长度最大值' ,
  `NUM_TRUES` bigint(20) DEFAULT NULL comment '为true的记录数' ,
  `NUM_FALSES` bigint(20) DEFAULT NULL comment '为false的记录数' ,
  `LAST_ANALYZED` bigint(20) NOT NULL comment '最后分析的时间' ,
  PRIMARY KEY (`CS_ID`),
  KEY `PART_COL_STATS_FK` (`PART_ID`),
  KEY `PCS_STATS_IDX` (`DB_NAME`,`TABLE_NAME`,`COLUMN_NAME`,`PARTITION_NAME`) USING BTREE,
  CONSTRAINT `PART_COL_STATS_FK` FOREIGN KEY (`PART_ID`) REFERENCES `partitions` (`PART_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf-8

登陆Hive元数据库,可PART_COL_STATS的查询

代码语言:javascript复制
SELECT * FROM PART_COL_STATS where table_name = '表名称' and partition_name = 'day=2018-12-03';

Hive统计信息收集方式有两种:

1. 表统计信息收集:

  • 自动收集参数设置

set hive.stats.autogather=true,在Hive DML操作(但Load data 这种方式除 外)时更新统计信息,

  • 手动统计信息收集命令

ANALYZE TABLE COMPUTE STATISTICS;

ANALYZE TABLE partition (day="2019-01-01") COMPUTE STATISTICS

信息收集同步到Hive元数据库的TABLE_PARAMS和PARTITION_PARAMS表内,包含了rawDataSize(未解压数据集大小)、numRows(记录数)。totalSizenumFiles是对Hive元数据库进行更新时操作的。

2. 列统计信息收集:

  • 自动收集参数设置

set hive.stats.column.autogather=true,这些信息的收集设定Hive参数自动进行收集,

  • 手动统计信息收集命令

ANALYZE TABLE COMPUTE STATISTICS for COLUMNS;

ANALYZE TABLE partition (day="2019-01-01") COMPUTE STATISTICS for COLUMNS;

手动执行命令基于表的命令,或基于表到分区的命令会将收集到信息同步到Hive元数据库的PART_COL_STATS或TAB_COL_STATS表内。包含了关于表各自分区ID或表ID唯一标示的表级别统计信息和列级别统计信息,常用的统计信息有,平均列长度、最大列长度、列数据类型、列的NDV非重复值的个数、为null值个数、为true或false个数等等

统计信息准确与否,直接决定了内存估算的准确性,进而影响成本函数的估算及优化器会构建出错误的执行计划。可见统计信息的重要性。

统计模块-内存估算源码解析

统计stats模块内存估算由HiveRelMdMemory继承了calicite的RelMdMemory实现的,Hive成本基于内存Memory的计算其实还不够完善,有些Operator都默认是0.0的内存大小。

1) HiveTableScan(表读取)、HiveFilter(谓词类似where条件)、HiveProject(投影类似Select 选取的字段操作符)、HiveUnion等这些源码实现时,默认给出0.0内存估算

代码语言:javascript复制
public Double memory(HiveUnion union, RelMetadataQuery mq) {
  return 0.0;
}
public Double memory(HiveProject project, RelMetadataQuery mq) {
    return 0.0;
}
public Double memory(HiveFilter filter, RelMetadataQuery mq) {
    return 0.0;
}
public Double memory(HiveTableScan tableScan, RelMetadataQuery mq) {
//HiveTableScan的RelNode计算内存,默认值0
    return 0.0d;
}

2)HiveAggregate汇总Operator的内存估算实现

先从元数据访问对象获取此HiveAggregate关系表达式总记录rowCount和记录平均大小avgRowSize

如果这两个值中,任意一个值为null,则内存估算的大小为null。否则

内存大小 = rowCount * avgRowSize

代码语言:javascript复制
public Double memory(HiveAggregate aggregate, RelMetadataQuery mq) {
  final Double avgRowSize = mq.getAverageRowSize(aggregate.getInput());
  final Double rowCount = mq.getRowCount(aggregate.getInput());
  if (avgRowSize == null || rowCount == null) {
    return null;
  }
  return avgRowSize * rowCount;
}

3) HiveSortLimit排序Operator的内存估算实现

HiveSortLimit计算方法大致和汇总HiveAggregate类似,唯一区别在于Collation排序信息为null,即没有排序字段和排序方向的信息,此时内存估算大小为0.0.

代码语言:javascript复制
public Double memory(HiveSortLimit sort, RelMetadataQuery mq) {
  if (sort.getCollation() != RelCollations.EMPTY) { //否则,没排序,所需内存为0
    // It sorts
    final Double avgRowSize = mq.getAverageRowSize(sort.getInput()); //平均行大小
    final Double rowCount = mq.getRowCount(sort.getInput()); //行记录数
    if (avgRowSize == null || rowCount == null) {
      return null;
    }
    return avgRowSize * rowCount;
  }
  // It does not sort, memory footprint is zero
  //sort 操作符如果是不需要排序,或者这份数据已经sorted 那内存为0
  return 0.0;
}

4)HiveJoin会根据引擎不同,成本模型实现不同(MR 和 Tez两种引擎的对Join内存估算方法不同),返回结果不同。是由HiveCostModel模型内,对JoinAlgorithm接口具体实现决定的。

MR引擎:HiveDefaultCostModel,getMemory返回null

Tez引擎:HiveTezCostModel,使用HiveAlgorithmUtil.getJoinMemory() 实现的,不做展开

代码语言:javascript复制
public Double memory(HiveJoin join, RelMetadataQuery mq) {
  return join.getMemory();
}

/**
 * Join每个拆分阶段内存计算,参考Join operator的方法
 * @param join
 * @return
 */
public Double cumulativeMemoryWithinPhaseSplit(HiveJoin join) {
  return join.getCumulativeMemoryWithinPhaseSplit();
}

averageColumnSizes平均列大小的估算

Hive平均列大小的估算是由HiveRelMdSize继承calcite的RelMdSize实现的,源码解析如下:

1)HiveTableScan表扫描每列平均大小估算

TableScan列平均大小和其他Operator不同,其可RelOPtHiveTable表对象的形式获取TableScan所需要列的完整的ColStatistics统计信息对象,如果ColStatistics对象为null,使用数据类型方法averageTypeValueSize估算,否则事情getAvgColLen()元数据信息估算

代码语言:javascript复制
public List<Double> averageColumnSizes(HiveTableScan scan, RelMetadataQuery mq) {

  //获取TableScan的投影列的序数列表。
  List<Integer> neededcolsLst = scan.getNeededColIndxsFrmReloptHT();
  //获取scan所需列的统计信息列表,这些统计信息包括:基数、null记录数、平均长度,true记录数、false记录数
  List<ColStatistics> columnStatistics = ((RelOptHiveTable) scan.getTable())
      .getColStat(neededcolsLst, true);

  // Obtain list of col stats, or use default if they are not available
  final ImmutableList.Builder<Double> list = ImmutableList.builder();
  int indxRqdCol = 0;
  int nFields = scan.getRowType().getFieldCount();//获取记录行字段个数  返回的应该table的所有字段的个数  Returns the number of fields in a struct type

  for (int i = 0; i < nFields; i  ) { //遍历总个数
    if (neededcolsLst.contains(i)) { //如果此序号是投影所需的字段,则进行估算平均长度,否则其余字段默认长度为0。

      ColStatistics columnStatistic = columnStatistics.get(indxRqdCol); //根据投影字段索引,获取对应的统计信息
      indxRqdCol  ;
      if (columnStatistic == null) {
        RelDataTypeField field = scan.getRowType().getFieldList().get(i);//如果这一列统计信息为null,则根据字段数据类型进行估算。
        list.add(averageTypeValueSize(field.getType()));//根据数据类型的估算添加到列表
      } else {
        list.add(columnStatistic.getAvgColLen()); //统计信息非空,则获取其平均长度
      }

    } else {
      list.add(new Double(0));//不是投影必须的字段,默认长度为0。
    }
  }

  return list.build();
}

2) SemiJoin每列平均大小估算

SemiJoin只需要获取左侧RelNode关系表达式使用RelMetadataQuery访问收集的元数据信息进行估算大小

代码语言:javascript复制
public List<Double> averageColumnSizes(SemiJoin rel, RelMetadataQuery mq) {
  final RelNode left = rel.getLeft();
  final List<Double> lefts =
      mq.getAverageColumnSizes(left); //返回RelNode表达式每一列的大小列表,单位bytes
  if (lefts == null) {
    return null;
  }
  final int fieldCount = rel.getRowType().getFieldCount();//字段个数
  Double[] sizes = new Double[fieldCount];
  if (lefts != null) {
    lefts.toArray(sizes);//初始化每列大小
  }
  return ImmutableNullableList.copyOf(sizes);
}

3)HiveJoin每列平均大小估算

Join和上述的SemiJoin的平均列大小估算方法大致相同,区别是Join获取左右侧两侧RelNode关系表达式使用RelMetadataQuery访问收集的元数据信息进行估算大小

代码语言:javascript复制
public List<Double> averageColumnSizes(HiveJoin rel, RelMetadataQuery mq) {
  final RelNode left = rel.getLeft();
  final RelNode right = rel.getRight();

  //获取左右两侧列大小
  final List<Double> lefts =
      mq.getAverageColumnSizes(left);
  List<Double> rights = mq.getAverageColumnSizes(right);

  if (lefts == null && rights == null) {
    return null;
  }
  final int fieldCount = rel.getRowType().getFieldCount();
  Double[] sizes = new Double[fieldCount];
  if (lefts != null) {
    lefts.toArray(sizes);
  }
  if (rights != null) {
    final int leftCount = left.getRowType().getFieldCount();
    for (int i = 0; i < rights.size(); i  ) {
      sizes[leftCount   i] = rights.get(i);  //在左侧的基础上右侧添加到数组,返回
    }
  }
  return ImmutableNullableList.copyOf(sizes);
}

4) Hive每种数据类型大小的估算

这里枚举了每种数据类型大小估算,大致30种数据类型的情况

代码语言:javascript复制
/**
 * todo:等支持所有类型时,将会移除这块,但是现在到Hive3.0了 还在
 * 每种数据类型到枚举估算,
 * @param type
 * @return
 */
@Override
public Double averageTypeValueSize(RelDataType type) {
  switch (type.getSqlTypeName()) {
  case BOOLEAN:
  case TINYINT:
    return 1d;
  case SMALLINT:
    return 2d;
  case INTEGER:
  case FLOAT:
  case REAL:
  case DECIMAL:
  case DATE:
  case TIME:
    return 4d;
  case BIGINT:
  case DOUBLE:
  case TIMESTAMP:
  case INTERVAL_DAY:
  case INTERVAL_DAY_HOUR:
  case INTERVAL_DAY_MINUTE:
  case INTERVAL_DAY_SECOND:
  case INTERVAL_HOUR:
  case INTERVAL_HOUR_MINUTE:
  case INTERVAL_HOUR_SECOND:
  case INTERVAL_MINUTE:
  case INTERVAL_MINUTE_SECOND:
  case INTERVAL_MONTH:
  case INTERVAL_SECOND:
  case INTERVAL_YEAR:
  case INTERVAL_YEAR_MONTH:
    return 8d;
  case BINARY:
    return (double) type.getPrecision();   //Gets the JDBC-defined precision for values of this type.
  case VARBINARY:
    return Math.min(type.getPrecision(), 100d);
  case CHAR:
    return (double) type.getPrecision() * BYTES_PER_CHARACTER;  //每个字符字节 * precision for values of this type.
  case VARCHAR:
    // Even in large (say VARCHAR(2000)) columns most strings are small
    return Math.min((double) type.getPrecision() * BYTES_PER_CHARACTER, 100d);
  case ROW:
    Double average = 0.0;
    for (RelDataTypeField field : type.getFieldList()) {
      average  = averageTypeValueSize(field.getType()); //如果是Row类型,进行递归调用求 平均值总和
    }
    return average;
  default:
    return null;
  }
}

总结

内存的估算是根据stats统计模块收集的元数据信息:总记录数、平均列长度、列数、列数据类型,按照一定计算方法得出。内存作为成本函数输入,是成本高低很重要的一部分指标。

以上就是内存计算方法或方式的源码讲解,由于笔者知识及水平有限,因此文中错漏之处在所难免,恳请各位老师、专家不吝赐教。

0 人点赞