本篇章继续Impala查询机制相关的探索和学习,本篇主要讲解join优化器的优化原理和思路。
连接优化
join操作指的是多个表的连接操作,包括内连接、左连接、右连接和全连接等。从查询语句到最终的查询执行,impala(优化器)主要进行了如下操作:
- 确定连接顺序:decide what's the join order
- 确定连接策略:decide which join strategy introduced
本篇文章,将懂点讲解步骤1和2
使用示例SQL
建表语句:
代码语言:javascript复制create table T1(id int, name string);
create table T2(id int, age int);
create table T3(id int, sex string);
create table T4(id int, address string);
初始化测试数据:
代码语言:javascript复制insert into T1 values(1, 'tencent cloud'),(2, 'Tencent cloud emr'),(3, 'xiaofeng'),(4, 'data center');
insert into T2 values(1, 25),(2, 20),(3, 26),(4, 30);
insert into T3 values(1, 'female'),(2, 'male'),(3, 'female'),(4, 'male');
insert into T4 values(1, 'sichuan'),(2, 'cd'),(3, 'zhongguo'),(4, 'chengdu');
重要:在insert 完成之后,对四张表t1、t2、t3、t4,执行:compute stats 表名 命令, 用来更新Planner的统计信息,使生成的查询计划更准确。如果未执行stats, 在生成查询计划的时候会报以下warning:
连接查询语句:
代码语言:javascript复制select * from T1 inner join T2 inner join T3 on t1.id = t2.id and t2.id = t3.id ;
单节点查询计划(Single node plan)
代码语言:javascript复制Query: explain select * from T1 inner join T2 inner join T3 on t1.id = t2.id and t2.id = t3.id
---------------------------------------------------------------------
| Explain String |
---------------------------------------------------------------------
| Max Per-Host Resource Reservation: Memory=6.06MB Threads=4 |
| Per-Host Resource Estimates: Memory=102MB |
| Codegen disabled by planner |
| |
| PLAN-ROOT SINK |
| | |
| 04:HASH JOIN [INNER JOIN] |
| | hash predicates: t2.id = t3.id |
| | runtime filters: RF000 <- t3.id |
| | row-size=57B cardinality=4 |
| | |
| |--02:SCAN HDFS [impala_test_db_2.t3] |
| | HDFS partitions=1/1 files=1 size=32B |
| | row-size=21B cardinality=4 |
| | |
| 03:HASH JOIN [INNER JOIN] |
| | hash predicates: t1.id = t2.id |
| | runtime filters: RF002 <- t2.id |
| | row-size=36B cardinality=4 |
| | |
| |--01:SCAN HDFS [impala_test_db_2.t2] |
| | HDFS partitions=1/1 files=1 size=20B |
| | runtime filters: RF000 -> t2.id |
| | row-size=8B cardinality=4 |
| | |
| 00:SCAN HDFS [impala_test_db_2.t1] |
| HDFS partitions=1/1 files=1 size=61B |
| runtime filters: RF000 -> impala_test_db_2.t1.id, RF002 -> t1.id |
| row-size=28B cardinality=4 |
---------------------------------------------------------------------
确定连接顺序(join order)
join order, 其实质就是依据single node plan 确定哪张表是left table、哪张表是right table
Impala确定join order 是采用left-deep tree的形式:最大的表在左侧left side、小表在右侧right side。依据单节点查询计划,构造如下图所示的left-tree结构
Impala优化器首先找到容量最大的表T1,与所有的表进行比较,找到最小的表T2,连接之后可以生成最小的中间结果(intermedia result). 将最大的表与最小的表进行组合(join)生成中间的表。然后重复此过程,最终生成left-deep tree.
为什么Impala使用left-deep tree呢? 因为它可以很好的实现并行性,树的所有右节点都可以实现并行执行
确定连接策略(join strategy)
在确定完成join order 之后,接下来的第二步就是在distributed plan阶段确定哪个执行策略?
代价(cost)最低的 是broadcast join还是partition join呢?
先看下,Impala的两种连接策略:
- Broadcast join:Impala 默认的的连接策略,当left-deep tree 右表足够小的时候,会通过广播的形将右表(小表)广播到其它节点,并同大表(每个节点上都有大表的部分数据)进行连接。
- Partitioned join : 这个策略通常适用于两个表都比较大,且两个表大小基本一致的情况,每一个表都会依据join列进行哈希,将数据分散到各个节点上。
评价join strategy的代价衡量主要包括了两个指标:
- network cost : 由于表连接而产生的网络流量的多少来作为网络代价
- memory cost:由连接产生的内存消耗作为内存代价的评估(连接操作主要是哈希表的建立)
为什么是network cost 和 memory cost呢?
想一下left - deep tree, 在连接过程中,我们需要为right side 的表在内存中建立hash table, hash table的大小取决于表的大小,这个时候memory 容量就成了一个评估的指标。 同时如果采用broadcast的侧略,需要广播同一个hash table到集群的每一个host上。对于partition join ,把数据打散成slice, 并把每个slice 放到一台host 上面去做join 操作, right side仍需要在memory中构建hash table。
所以,网络network 和 内存memory时两个重要的参考指标。
Network cost& Memory cost 怎样度量
network cost和memory cost度量如下所示
Join strategy cost | Network Traffic | Memory Usage (Hash table) |
---|---|---|
Broadcast join | RHS table size * number of node | RHS table size * number of node |
Partitioned join | LHS RHS table size | RHS table size |
如上表所示,通常table size是由投影列( 连接列,非重复行) * 表的行数计算得到。
连接策略的选择就依靠总代价
代码语言:javascript复制Total cost = Network traffic Memory Usage
选择总代价Total cost 较小的join strategy 来作为连接策略。
Hash join算法
本小节将以伪代码的形式描述Hash join算法的实现
代码语言:javascript复制 // T1 inner join T2 on T1.id = T2.id
//phase 1: 构建right-hand table in memory
for each row in T2 table
hash-value = hash(T2.id)
map[hash-value].add(row)
//phase 2: Probe phase:The left-hand table is scanned row by row
row = read row from T1 table
while row
hash-value = hash(geoip.country_code)
if map[hash-value] != null
//found matching row, send both rows to output stream
else
//do nothing
row = read next row from T1 table
// map 数据结构只支持在key上的等值连接
以上算法实现是等值连接,Impala从版本1.2.2之后,也支持非等值连接,支持持cross join 操作符。