Presto统计信息

2020-10-26 15:53:16 浏览数 (1)

表统计

Presto支持基于统计的查询优化。为了使查询能够利用这些优化,Presto必须具有该查询中表的统计信息。

表统计信息通过连接器提供给查询计划者。当前,唯一支持统计信息的连接器是Hive连接器

统计信息通过table layout显示给查询计划者。table layout代表表数据的子集,并包含有关该数据的组织属性的信息(例如排序顺序和存储分区)。

一个表可用的table layout数量以及这些table layout的详细信息特定于每个连接器。以Hive连接器为例:

  • 非分区表只有一种table layout,代表表中的所有数据
  • 分区表具有一系列table layout。每组要扫描的分区代表一个table layout。 Presto会根据查询中的过滤谓词,尝试选择由最少数量的分区组成的table layout

Hive连接器会自动收集有关INSERTCREATE TABLE AS操作的基本统计信息(numFiles,numRows,rawDataSize,totalSize)。

Hive连接器还可以收集列级别的统计信息:

image.png

写入时自动进行列级统计信息收集由collect-column-statistics-on-write会话属性控制。

Hive连接器支持通过ANALYZE语句收集表和分区统计信息。分析分区表时,可以通过可选的partitions属性指定要分析的分区,该属性是一个包含分区键值的数组.

代码语言:javascript复制
ANALYZE hive.sales WITH (
    partitions = ARRAY[
        ARRAY['partition1_value1', 'partition1_value2'],
        ARRAY['partition2_value1', 'partition2_value2']]);

该查询将使用键收集2个分区的统计信息。

Available Statistics

Presto提供以下统计信息:

对于表:

  • 行数:table layout中的总行数

对于表中的每一列:

  • 数据大小:需要读取的数据大小
  • 空值分数:空值的分数
  • 不重复值计数:不重复值的数量
  • 低值:列中的最小值
  • 高值:列中的最大值

可用于特定查询的统计信息集取决于所使用的连接器,并且还可能因表甚至table layout而异。例如,Hive连接器当前不提供有关数据大小的统计信息。

可以使用SHOW STATS for命令通过Presto SQL界面显示表统计信息。

SHOW STATS for.png

Cost in EXPLAIN

代码语言:javascript复制
EXPLAIN [ ( option [, ...] ) ] statement

option:
    FORMAT { TEXT | GRAPHVIZ | JSON }
    TYPE { LOGICAL | DISTRIBUTED | VALIDATE | IO }

在计划过程中,将基于查询中表的表统计信息来计算与计划的每个节点关联的成本。计算出的成本将作为EXPLAIN语句输出的一部分进行打印。

成本信息以{rows: XX (XX), cpu: XX, memory: XX, network: XX}的格式显示在计划树中。rows是指执行期间每个计划节点输出的预期行数。行数后括号中的值是指每个计划节点输出的数据的预期大小(以字节为单位)。其他参数指示计划节点的执行所使用的CPU,内存和网络的估计数量。这些值不代表任何实际单位,而是用于比较计划节点之间的相对成本的数字,从而使优化器可以选择最佳计划来执行查询 。如果不知道任何值,?打印出来。

代码语言:javascript复制
presto:default> EXPLAIN SELECT comment FROM tpch.sf1.nation WHERE nationkey > 3;

- Output[comment] => [[comment]]
        Estimates: {rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 1734.25}
    - RemoteExchange[GATHER] => [[comment]]
            Estimates: {rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 1734.25}
        - ScanFilterProject[table = tpch:nation:sf1.0, filterPredicate = ("nationkey" > BIGINT '3')] => [[comment]]
                Estimates: {rows: 25 (1.94kB), cpu: 2207.00, memory: 0.00, network: 0.00}/{rows: 22 (1.69kB), cpu: 4414.00, memory: 0.00, network: 0.00}/{rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 0.00}
                nationkey := tpch:nationkey
                comment := tpch:comment

通常,每个计划节点仅打印一个成本。但是,当将Scan运算符与Filter和/或Project运算符组合在一起时,将打印出多个成本结构,每个成本结构都对应于组合运算符的单个逻辑部分。例如,将为ScanFilterProject算子打印三个成本结构,分别与Scan, Filter, 与Project部分相对应。

显示SQL语句的逻辑或分布式执行计划,或验证语句。使用TYPE DISTRIBUTED选项显示分段计划。每个计划片段均由单个或多个Presto节点执行。片段分离代表Presto节点之间的数据交换。片段类型指定Presto节点如何执行片段以及片段之间的数据分配方式.

  • SINGLE 片段在单个节点上执行.
  • HASH 片段在固定数量的节点上执行,使用哈希函数分配输入数据.
  • ROUND_ROBIN 片段在固定数量的节点上执行,输入数据以round-robin方式分布.
  • BROADCAST 在固定数量的节点上执行片段,并将输入数据广播到所有节点.
  • SOURCE 在访问输入拆分的节点上执行片段.
  • Distributed plan 例:
代码语言:javascript复制
presto:tiny> EXPLAIN (TYPE DISTRIBUTED) SELECT regionkey, count(*) FROM nation GROUP BY 1;
                                          Query Plan
----------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [regionkey, count]
     Output partitioning: SINGLE []
     - Output[regionkey, _col1] => [regionkey:bigint, count:bigint]
             _col1 := count
         - RemoteSource[1] => [regionkey:bigint, count:bigint]

 Fragment 1 [HASH]
     Output layout: [regionkey, count]
     Output partitioning: SINGLE []
     - Aggregate(FINAL)[regionkey] => [regionkey:bigint, count:bigint]
             count := "count"("count_8")
         - LocalExchange[HASH][$hashvalue] ("regionkey") => regionkey:bigint, count_8:bigint, $hashvalue:bigint
             - RemoteSource[2] => [regionkey:bigint, count_8:bigint, $hashvalue_9:bigint]

 Fragment 2 [SOURCE]
     Output layout: [regionkey, count_8, $hashvalue_10]
     Output partitioning: HASH [regionkey][$hashvalue_10]
     - Project[] => [regionkey:bigint, count_8:bigint, $hashvalue_10:bigint]
             $hashvalue_10 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("regionkey"), 0))
         - Aggregate(PARTIAL)[regionkey] => [regionkey:bigint, count_8:bigint]
                 count_8 := "count"(*)
             - TableScan[tpch:tpch:nation:sf0.1, originalConstraint = true] => [regionkey:bigint]
                     regionkey := tpch:regionkey

EXPLAIN ANALYZE

估计成本还可以使用EXPLAIN ANALYZE [VERBOSE] SQL打印,分布式执行计划以及每个操作的成本。

使用VERBOSE选项时,将提供更详细的信息和低级别的统计信息;要了解这些内容,需要了解Presto内部和实现细节。

可以看到每个阶段花费的CPU时间以及该阶段中每个计划节点的相对成本。然而,计划节点的相对成本基于wall time,该时间可能会或可能不会与CPU时间相关。对于每个计划节点,您可以看到其他统计信息(如:每个节点实例的平均输入,相关计划节点的平均哈希碰撞数)。想要检测查询的数据异常(数据倾斜,异常哈希碰撞)时,此类统计非常有用。

代码语言:javascript复制
presto:dm_db> EXPLAIN ANALYZE select a.pass_id ,a.channel_id from( select m.pass_id ,m.channel_id ,coalesce(a.task_gold,0) as task_gold from( select a.pass_id ,a.channel_id from( select a.pass_id ,a.first_channel_idas channel_id ,row_number() over(partition by a.pass_id order by a.first_channel_id desc) as row_num_desc from dw_db.dw_common_mobile_device_user_mapping as a where a.p_product='browser_app' and a.p_project='browser' and a.p_dt='2020-07-13' and a.last_date between '2020-06-13' and '2020-07-13' and coalesce(a.pass_id,0)<>0 ) as a where a.row_num_desc=1 )as m left join( select a.pass_id ,sum(a.task_gold) as task_gold from dw_db.dw_browser_app_xqlm_task_log as a where a.p_dt between '2020-06-13' and '2020-07-13' and a.status=0 group by a.pass_id ) as a on a.pass_id = m.pass_id ) as a where a.task_gold >0;

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [HASH]
     CPU: 24.60s, Scheduled: 48.18s, Input: 7115227 rows (302.06MB); per task: avg.: 2371742.33 std.dev.: 5189.23, Output: 265662 rows (9.24MB)
     Output layout: [first_channel_id, pass_id]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     InnerJoin[("pass_id" = "pass_id_21")][$hashvalue, $hashvalue_65]
     │   Layout: [first_channel_id:varchar, pass_id:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     │   CPU: 10.89s (10.17%), Scheduled: 30.23s (0.90%), Output: 265662 rows (9.24MB)
     │   Left (probe) Input avg.: 9818.46 rows, Input std.dev.: 1.11%
     │   Right (build) Input avg.: 6401.85 rows, Input std.dev.: 1.08%
     │           Collisions avg.: 690.42 (100.63% est.), Collisions std.dev.: 141.48%
     │   Distribution: PARTITIONED
     ├─ FilterProject[filterPredicate = (("row_number" = BIGINT '1') AND (COALESCE("pass_id", BIGINT '0') <> BIGINT '0'))]
     │  │   Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue:bigint]
     │  │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}/{rows: ? (?), cpu: ?, memory: ?, network: ?}
     │  │   CPU: 33.00ms (0.03%), Scheduled: 37.00ms (0.00%), Output: 471286 rows (17.06MB)
     │  │   Input avg.: 9818.46 rows, Input std.dev.: 1.11%
     │  └─ TopNRowNumber[partition by (pass_id), order by (first_channel_id DESC_NULLS_LAST) limit 1][$hashvalue]
     │     │   Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue:bigint, row_number:bigint]
     │     │   CPU: 8.43s (7.87%), Scheduled: 12.03s (0.36%), Output: 471286 rows (21.10MB)
     │     │   Input avg.: 10191.81 rows, Input std.dev.: 1.20%
     │     │   row_number := row_number()
     │     └─ LocalExchange[HASH][$hashvalue] ("pass_id")
     │        │   Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue:bigint]
     │        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     │        │   CPU: 271.00ms (0.25%), Scheduled: 311.00ms (0.01%), Output: 489207 rows (17.70MB)
     │        │   Input avg.: 10191.81 rows, Input std.dev.: 132.04%
     │        └─ RemoteSource[2]
     │               Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue_60:bigint]
     │               CPU: 28.00ms (0.03%), Scheduled: 35.00ms (0.00%), Output: 489207 rows (17.70MB)
     │               Input avg.: 10191.81 rows, Input std.dev.: 132.04%
     └─ FilterProject[filterPredicate = (COALESCE("sum", BIGINT '0') > BIGINT '0')]
        │   Layout: [pass_id_21:bigint, $hashvalue_65:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}/{rows: ? (?), cpu: ?, memory: ?, network: ?}
        │   CPU: 381.00ms (0.36%), Scheduled: 384.00ms (0.01%), Output: 307289 rows (5.27MB)
        │   Input avg.: 6401.85 rows, Input std.dev.: 1.08%
        │   $hashvalue_65 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("pass_id_21"), 0))
        └─ Aggregate(FINAL)[pass_id_21]
           │   Layout: [pass_id_21:bigint, sum:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
           │   CPU: 2.37s (2.21%), Scheduled: 2.72s (0.08%), Output: 307289 rows (5.27MB)
           │   Input avg.: 138042.08 rows, Input std.dev.: 1.61%
           │   Collisions avg.: 8646.84 (1263.76% est.), Collisions std.dev.: 141.95%
           │   sum := sum("sum_59")
           └─ LocalExchange[HASH][$hashvalue_62] ("pass_id_21")
              │   Layout: [pass_id_21:bigint, sum_59:row(bigint, boolean, bigint, boolean), $hashvalue_62:bigint]
              │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
              │   CPU: 1.46s (1.37%), Scheduled: 1.55s (0.05%), Output: 6626020 rows (284.36MB)
              │   Input avg.: 138042.08 rows, Input std.dev.: 82.07%
              └─ RemoteSource[3]
                     Layout: [pass_id_21:bigint, sum_59:row(bigint, boolean, bigint, boolean), $hashvalue_63:bigint]
                     CPU: 520.00ms (0.49%), Scheduled: 536.00ms (0.02%), Output: 6626020 rows (284.36MB)
                     Input avg.: 138042.08 rows, Input std.dev.: 82.07%

 Fragment 2 [SOURCE]
     CPU: 39.81s, Scheduled: 18.59m, Input: 131534564 rows (3.68GB); per task: avg.: 65767282.00 std.dev.: 5417075.00, Output: 489207 rows (17.70MB)
     Output layout: [first_channel_id, pass_id, $hashvalue_61]
     Output partitioning: HASH [pass_id][$hashvalue_61]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNRowNumber[partition by (pass_id), order by (first_channel_id DESC_NULLS_LAST) limit 1][$hashvalue_61]
     │   Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue_61:bigint]
     │   CPU: 2.64s (2.46%), Scheduled: 3.04s (0.09%), Output: 489207 rows (17.70MB)
     │   Input avg.: 6894.55 rows, Input std.dev.: 14.84%
     │   row_number := row_number()
     └─ ScanFilterProject[table = hive:dw_db:dw_common_mobile_device_user_mapping, grouped = false, filterPredicate = (("last_date" BETWEEN CAST('2020-06-13' AS varchar) AND CAST('2020-07-13' AS varchar)) AND (COALES
            Layout: [first_channel_id:varchar, pass_id:bigint, $hashvalue_61:bigint]
            Estimates: {rows: 131534564 (3.82GB), cpu: 4.55G, memory: 0B, network: 0B}/{rows: 118381094 (3.43GB), cpu: 9.10G, memory: 0B, network: 0B}/{rows: 118381094 (3.43GB), cpu: 12.54G, memory: 0B, network: 0B}
            CPU: 37.17s (34.71%), Scheduled: 36.34m (64.89%), Output: 489513 rows (17.71MB)
            Input avg.: 1852599.49 rows, Input std.dev.: 14.66%
            $hashvalue_61 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("pass_id"), 0))
            first_channel_id := first_channel_id:string:REGULAR
            pass_id := pass_id:bigint:REGULAR
            last_date := last_date:string:REGULAR
            p_product:string:PARTITION_KEY
                :: [[browser_app]]
            p_dt:string:PARTITION_KEY
                :: [[2020-07-13]]
            p_project:string:PARTITION_KEY
                :: [[browser]]
            Input: 131534564 rows (3.68GB), Filtered: 99.63%

 Fragment 3 [SOURCE]
     CPU: 42.88s, Scheduled: 10.14m, Input: 140221136 rows (2.48GB); per task: avg.: 46740378.67 std.dev.: 25798179.66, Output: 6626020 rows (284.36MB)
     Output layout: [pass_id_21, sum_59, $hashvalue_64]
     Output partitioning: HASH [pass_id_21][$hashvalue_64]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Project[]
     │   Layout: [pass_id_21:bigint, sum_59:row(bigint, boolean, bigint, boolean), $hashvalue_64:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     │   CPU: 4.96s (4.63%), Scheduled: 8.06s (0.24%), Output: 6626020 rows (284.36MB)
     │   Input avg.: 46335.80 rows, Input std.dev.: 65.57%
     │   $hashvalue_64 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("pass_id_21"), 0))
     └─ Aggregate(PARTIAL)[pass_id_21]
        │   Layout: [pass_id_21:bigint, sum_59:row(bigint, boolean, bigint, boolean)]
        │   CPU: 14.09s (13.16%), Scheduled: 18.06s (0.54%), Output: 6626020 rows (227.49MB)
        │   Input avg.: 980565.59 rows, Input std.dev.: 68.56%
        │   Collisions avg.: 237826.72 (2083.20% est.), Collisions std.dev.: 353.84%
        │   sum_59 := sum("expr_28")
        └─ ScanFilterProject[table = hive:dw_db:dw_browser_app_xqlm_task_log, grouped = false, filterPredicate = (("status" = 0) AND (COALESCE("pass_id_21", BIGINT '0') <> BIGINT '0'))]
               Layout: [pass_id_21:bigint, expr_28:bigint]
               Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
               CPU: 23.83s (22.26%), Scheduled: 18.38m (32.82%), Output: 140220879 rows (2.35GB)
               Input avg.: 980567.38 rows, Input std.dev.: 68.56%
               expr_28 := CAST("task_gold" AS bigint)
               pass_id_21 := pass_id:bigint:REGULAR
               task_gold := task_gold:int:REGULAR
               status := status:int:REGULAR
               p_dt:string:PARTITION_KEY
                   :: [[2020-06-13], [2020-06-14], [2020-06-15], [2020-06-16], [2020-06-17], [2020-06-18], [2020-06-19], [2020-06-20], [2020-06-21], [2020-06-22], [2020-06-23], [2020-06-24], [2020-06-25], [2020-06-26], [2020-06-27], [2020-06-28], [
               Input: 140221136 rows (2.48GB), Filtered: 0.00%

0 人点赞