一步一步理解 Impala query profile(三)

2019-12-16 16:57:39 浏览数 (1)

在本博客系列的第3部分,我将仍然关注查询计划(Query Plan)和执行摘要(Execution Summary),但是将使用真实数据(Kaggle’s Flights Delay database),来执行一个更复杂的查询。

测试用的数据库有三张表:

  • flights.csv
  • airlines.csv
  • airports.csv

查询语句如下:

代码语言:javascript复制
SELECT 
    a.airline as airline_name, 
    COUNT(IF(cancelled = 1, 1, NULL)) AS cancelled, 
    COUNT(1) AS total, 
    CONCAT(CAST(CAST(COUNT(IF(cancelled = 1, 1, NULL)) / COUNT(1) AS DECIMAL(8,4)) * 100 AS STRING), "%") AS cancelled_rate 
FROM flights f 
JOIN airlines a 
  ON (f.airline = a.iata_code) 
GROUP BY a.airline ORDER BY a.airline 

该查询将连接航班(flights)表和机场(airports)表生成一个报表,该报表可以告诉我们2015年期间每个航空公司的航班取消率,报表结果如下:

代码语言:javascript复制
 ------------------------------ ----------- --------- ---------------- 
| airline_name                 | cancelled | total   | cancelled_rate |
 ------------------------------ ----------- --------- ---------------- 
| Alaska Airlines Inc.         | 669       | 172521  | 0.3800%        |
| American Airlines Inc.       | 10919     | 725984  | 1.5000%        |
| American Eagle Airlines Inc. | 15025     | 294632  | 5.0900%        |
| Atlantic Southeast Airlines  | 15231     | 571977  | 2.6600%        |
| Delta Air Lines Inc.         | 3824      | 875881  | 0.4300%        |
| Frontier Airlines Inc.       | 588       | 90836   | 0.6400%        |
| Hawaiian Airlines Inc.       | 171       | 76272   | 0.2200%        |
| JetBlue Airways              | 4276      | 267048  | 1.6000%        |
| Skywest Airlines Inc.        | 9960      | 588353  | 1.6900%        |
| Southwest Airlines Co.       | 16043     | 1261855 | 1.2700%        |
| Spirit Air Lines             | 2004      | 117379  | 1.7000%        |
| US Airways Inc.              | 4067      | 198715  | 2.0400%        |
| United Air Lines Inc.        | 6573      | 515723  | 1.2700%        |
| Virgin America               | 534       | 61903   | 0.8600%        |
 ------------------------------ ----------- --------- ---------------- 

查询计划和执行概要的详细信息如下:

代码语言:javascript复制
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
PLAN-ROOT SINK
|  mem-estimate=0B mem-reservation=0B
|
08:MERGING-EXCHANGE [UNPARTITIONED]
|  order by: a.airline ASC
|  mem-estimate=0B mem-reservation=0B
|  tuple-ids=3 row-size=52B cardinality=14
|
F02:PLAN FRAGMENT [HASH(a.airline)] hosts=4 instances=4
Per-Host Resources: mem-estimate=22.00MB mem-reservation=13.94MB
04:SORT
|  order by: a.airline ASC
|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB
|  tuple-ids=3 row-size=52B cardinality=14
|
07:AGGREGATE [FINALIZE]
|  output: count:merge(if(cancelled = 1, 1, NULL)), count:merge(*)
|  group by: a.airline
|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
|  tuple-ids=2 row-size=52B cardinality=14
|
06:EXCHANGE [HASH(a.airline)]
|  mem-estimate=0B mem-reservation=0B
|  tuple-ids=2 row-size=52B cardinality=14
|
F00:PLAN FRAGMENT [RANDOM] hosts=4 instances=4
Per-Host Resources: mem-estimate=187.94MB mem-reservation=3.94MB
03:AGGREGATE [STREAMING]
|  output: count(if(cancelled = 1, 1, NULL)), count(*)
|  group by: a.airline
|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
|  tuple-ids=2 row-size=52B cardinality=14
|
02:HASH JOIN [INNER JOIN, BROADCAST]
|  hash predicates: f.airline = a.iata_code
|  fk/pk conjuncts: f.airline = a.iata_code
|  runtime filters: RF000 <- a.iata_code
|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
|  tuple-ids=0,1 row-size=73B cardinality=5819079
|
|--05:EXCHANGE [BROADCAST]
|  |  mem-estimate=0B mem-reservation=0B
|  |  tuple-ids=1 row-size=54B cardinality=14
|  |
|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B
|  01:SCAN HDFS [flight_delay.airlines a, RANDOM]
|     partitions=1/1 files=1 size=341B
|     stats-rows=14 extrapolated-rows=disabled
|     table stats: rows=14 size=341B
|     column stats: all
|     mem-estimate=32.00MB mem-reservation=0B
|     tuple-ids=1 row-size=54B cardinality=14
|
00:SCAN HDFS [flight_delay.flights f, RANDOM]
   partitions=1/1 files=1 size=564.96MB
   runtime filters: RF000 -> f.airline
   stats-rows=5819079 extrapolated-rows=disabled
   table stats: rows=5819079 size=564.96MB
   column stats: all
   mem-estimate=176.00MB mem-reservation=0B
   tuple-ids=0 row-size=19B cardinality=5819079
----------------
    Estimated Per-Host Mem: 253689856
    Per Host Min Reservation: host-10-17-100-140.coe.cloudera.com:22000(17.88 MB) host-10-17-100-141.coe.cloudera.com:22000(17.88 MB) host-10-17-100-143.coe.cloudera.com:22000(17.88 MB) host-10-17-100-147.coe.cloudera.com:22000(17.88 MB) 
    Request Pool: root.hive
    Admission result: Admitted immediately
    ExecSummary: 
Operator              #Hosts   Avg Time   Max Time  #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail                  
-------------------------------------------------------------------------------------------------------------------------
08:MERGING-EXCHANGE        1    4s122ms    4s122ms     14          14          0              0  UNPARTITIONED           
04:SORT                    4  249.999us  999.996us     14          14   12.02 MB       12.00 MB                          
07:AGGREGATE               4    2.750ms    4.000ms     14          14    1.99 MB       10.00 MB  FINALIZE                
06:EXCHANGE                4    4s100ms    4s137ms     55          14          0              0  HASH(a.airline)         
03:AGGREGATE               4  280.499ms  339.002ms     55          14   10.11 MB       10.00 MB  STREAMING               
02:HASH JOIN               4  177.749ms  184.999ms  5.82M       5.82M   10.05 MB        1.94 MB  INNER JOIN, BROADCAST   
|--05:EXCHANGE             4    0.000ns    0.000ns     14          14          0              0  BROADCAST               
|  01:SCAN HDFS            1   97.000ms   97.000ms     14          14  177.00 KB       32.00 MB  flight_delay.airlines a 
00:SCAN HDFS               4    2s052ms    3s278ms  5.82M       5.82M   40.06 MB      176.00 MB  flight_delay.flights f  

这次我们先跳到执行摘要部分,因为它更容易看到,而且通常是我在帮助CDH用户排查Impala查询相关问题时首先要检查的部分。从上面的执行摘要信息中,我们可以看到在查询执行期间发生了什么:

1、从HDFS扫描上flight_delay.flights表的数据平均花费2秒时间(2s052ms

2、Impala估算到flight_delay.flights表的数据为582万行,和实际返回的行数相同,这表明表统计信息是最新的

3、Impala估算到扫描flight_delay.flights表需要176MB内存,但是实际上只用到40MB,这符合预期,因为估计内存不可能和实际使用的内存相同,我们的想法是尽可能地接近

4、由于数据量大,Impala对文件进行了分割,并在4台主机上执行扫描操作,从而分散负载

5、当flight_delay.flights表扫描完成之后,Impala开始扫描另一张表flight_delay.airlines。该表的估计行数和实际返回行数相同,说明表统计信息也是最新的。由于该表只有14行,所以只需要97毫秒就可以扫描它

6、由于表很小,只有14行,Impala只使用1台主机来执行扫描操作

7、下一步是广播(broadcast)较小的表flight_delay到执行查询的所有节点(在我的示例中是4台主机(广播))

8、广播完成之后,Impalaflight_delay.airlinesflight_delay.flights表执行Hash Join操作,花费177ms10MB内存

9、由于我们调用了COUNT聚合函数,Impala被要求执行聚合操作,该操作在4台主机上运行,花费280ms10MB内存并返回55

10、因为上面的步骤是在4个工作节点上执行的,所以Impala需要合并(merge)来自它们的结果,这是通过内部交换数据(exchanging the data internally)实现的,然后对中间结果(intermediate result)执行最后的聚合

11、因为我们的查询中有ORDER BY,因此在第10步完成后执行排序操作

你可以将Summary部分中的操作编号(比如000102等)与查询计划(Query Plan)部分中的编号相匹配,查询计划部分将告诉你相关操作的更多细节。我在第2部分中提到的细节,如果你需要参考,请参考前面的文章。

现在,让我们看看ProfilePlanner TimelineQuery Timeline部分:

代码语言:javascript复制
    Planner Timeline
      Analysis finished: 3ms (3389346)
      Equivalence classes computed: 3ms (3600838)
      Single node plan created: 4ms (4625920)
      Runtime filters computed: 4ms (4734686)
      Distributed plan created: 5ms (5120630)
      Lineage info computed: 13ms (13666462)
      Planning finished: 15ms (15712999)
    Query Timeline
      Query submitted: 0ns (0)
      Planning finished: 16ms (16999947)
      Submit for admission: 17ms (17999944)
      Completed admission: 17ms (17999944)
      Ready to start on 4 backends: 18ms (18999941)
      All 4 execution backends (10 fragment instances) started: 28ms (28999909)
      Rows available: 4.28s (4280986646)
      First row fetched: 4.31s (4308986559)

每行的信息都很容易理解,我们可以看到运行查询计划花费了15ms,从17ms开始向admission提交查询计划,从28ms开始在工作节点上执行查询计划,在4.28s时准备好最后一行数据并在4.31s时第一行数据被客户端获取(fetch)。这使你可以很清楚地了解每个阶段所花的时间,如果任何阶段都很慢,那将是非常明显的,然后我们可以开始进一步深入研究,以了解可能发生了什么。

因为我的查询很快,所以在这里看到它不是很有趣,让我们看看另一个真实的生产Impala query profile

代码语言:javascript复制
    Query Compilation: 16.268ms
       - Metadata of all 1 tables cached: 1.786ms (1.786ms)
       - Analysis finished: 6.162ms (4.376ms)
       - Value transfer graph computed: 6.537ms (374.918us)
       - Single node plan created: 7.955ms (1.417ms)
       - Runtime filters computed: 8.274ms (318.815us)
       - Distributed plan created: 8.430ms (156.307us)
       - Lineage info computed: 9.664ms (1.234ms)
       - Planning finished: 16.268ms (6.603ms)
    Query Timeline: 35m46s
       - Query submitted: 0.000ns (0.000ns)
       - Planning finished: 22.001ms (22.001ms)
       - Submit for admission: 23.001ms (1.000ms)
       - Completed admission: 23.001ms (0.000ns)
       - Ready to start on 2 backends: 24.001ms (1.000ms)
       - All 2 execution backends (2 fragment instances) started: 36.001ms (12.000ms)
       - Rows available: 5m51s (5m51s)
       - First row fetched: 5m52s (950.045ms)
       - Last row fetched: 35m46s (29m53s)
       - Released admission control resources: 35m46s (1.000ms)
       - Unregister query: 35m46s (30.001ms)
     - ComputeScanRangeAssignmentTimer: 0.000ns

这取自一个真实案例,Impala查询运行了很长时间,客户想要找出原因。从查询时间轴(Query Timeline)中,我们可以清楚地看到,从开始执行(一共两个执行后端(All 2 execution backends))到数据可用(可用行(Rows available))几乎花费了6分钟(5m51s)。这6分钟的执行可能是正常的,就像有很多大数据集的连接(join)一样,查询运行几分钟是很常见的。

但是,我们可以注意到Impala花了30分钟将数据传递回客户端,因为第一行在第6分钟获取,而最后一行在第36分钟获取。因此,从这里,我们可以怀疑Impala协调器(coordinator)和客户端之间可能存在一些网络问题(当从客户端,如impala-shellHue,到Impala协调器主机获取数据时)。另一种可能性是客户端可能在获取结果时也在执行其他操作,如在屏幕上打印,因为返回的数据可能很大,该操作可能很耗时。

因此,这部分概要信息可以指引我们找到寻找瓶颈的正确方向。

这是Impala profile系列的第3部分,详细介绍了如何将查询计划部分中显示的操作号与概要文件部分的最后部分联系起来,概要文件部分显示了每个操作的详细度量,包括平均操作和每个主机上的单独操作。

编译自:IMPALA QUERY PROFILE EXPLAINED – PART 3

0 人点赞