背景
近日有用户反馈在使用Impala的过程中,SQL执行的很慢,我们抓取到相关的SQL,简化之后,如下所示(其中相关的敏感信息都已经做了替换):
代码语言:javascript复制select count(1) from user_table
where day ='2019-03-19'
and url rlike 'http://www.example.com/'
and user_udf(info, 'type') = 'IOS'
and from_unixtime(cast(time/1000 as int), 'HH') <= '23';
其中,user_udf是用户自己用java编写的一个UDF,主要就是通过对info列进行一系列的处理,然后使用第二个参数进行匹配,返回匹配之后的结果。表的schema如下所示:
代码语言:javascript复制CREATE EXTERNAL TABLE user_table (
time BIGINT,
url STRING,
info STRING
)
PARTITIONED BY (
day STRING
)
STORED AS PARQUET
LOCATION 'hdfs://xxx/user_table';
经过排查发现,由于数据量比较大(千万甚至上亿的级别),而且user_udf的执行效率很低,导致了整个sql跑的很慢。在测试过程中发现,如果只有url rlike 'http://www.example.com/'和from_unixtime(cast(time/1000 as int), 'HH') <= ’23’这两个过滤条件,那么得到的结果就只剩下几千条,可以说非常小了。我们将这个结果写入一个临时表,然后再使用user_udf(info, 'type') = 'IOS'这个过滤条件对测试表进行过滤,发现结果很快(因为测试表的大小只有几千条)。
基于以上测试结果,我萌生了这样一个想法,能不能通过手动调整where中的过滤条件顺序,来让url和time的过滤先执行,最后再对info使用udf进行判断?我们使用explain查看sql的执行计划,如下所示:
通过执行计划,我们可以看到,where中的一系列过滤条件都被转换成了相应的predicates,由于day是时间分区列,可以直接进行过滤,因此不在这个predicates里面。从图中我们可以看到,三个过滤条件的执行顺序依次是:info->time->url,使用udf的过滤条件被放到了第一个位置,这不是我们想要的结果,因此,我们修改SQL中的where条件顺序,如下所示:
代码语言:javascript复制select count(1) from user_table
where day ='2019-03-19'
and url rlike 'http://www.example.com/'
and from_unixtime(cast(time/1000 as int), 'HH') <= ’23'
and user_udf(info, 'type') = 'IOS';
再次通过explain执行sql,发现执行计划中的predicates与上述的一样,并没有发生改变。对于这个结果,我们猜测应该是impala在内部做了相应的处理,对于不同的predicate,有一个衡量标准来判断每个predicate的执行顺序。
代码追溯
为了验证上面的猜测,需要查看相应的源代码来进行验证。Impala的SQL语法解析部分都是在FE端执行的,由java编写,并且使用了开源的ANTLR来进行语法分析,因此我们可以使用IDEA来进行远程调试,跟踪相关的代码。
当SQL请求提交到了impalad节点,BE端(由C 编写)通过JNI来调用FE的相关接口来进行SQL解析,其中的这些细节就不再过多赘述,这里主要介绍关于这些predicate的生成及优化过程。Impala首先会通过ANTLR的相关API来进行相应的语法解析,主要代码如下:
代码语言:javascript复制// Frontend.java
public StatementBase parse(String stmt) throws AnalysisException {
SqlScanner input = new SqlScanner(new StringReader(stmt));
SqlParser parser = new SqlParser(input);
try {
return (StatementBase) parser.parse().value;
} catch (Exception e) {
throw new AnalysisException(parser.getErrorMsg(stmt), e);
}
}
其中StatementBase就是Impala定义的所有statement的base class,相关的UML图如下所示:
所有提交到Impala的SQL都会通过上面的parse方法转换成相应的xxxStmt类,上面的UML图中只列举出了其中的一部分,还有其他的Stmt没有列出。我们上面的SQL最终就会被转换成SelectStmt,所以我们主要来看这个类。其中有一个whereClause_成员,就是where条件中的各个过滤条件经过语法解析之后生成的结果,是一个Expr类,其UML图如下所示:
最终,where中的各个过滤条件就会被转换成对应的Predicate类,然后以树状结构存储在whereClause_的children_成员变量中,可以通过遍历的方式来获取到所有的predicate成员。我们可以通过断点调试跟踪到如下所示的成员变量内容:
从图中我们可以看到,whereClause_包含了两个字节点,分别是CompoundPredicate和BinaryPredicate。我们手动将这些predicates转换成相应的树状信息,结果如下图所示:
图中的灰色节点和蓝色节点代表的是叶子节点,它们的children_为空。其中,灰色的SlotRef代表了一个column的相关信息,而蓝色的节点一般代表常量成员。从图中我们看到这个树按照从左到右,从上到下的顺序,与SQL语句中的where过滤条件顺序是一致,所以说相关的predicate顺序调整不在这个地方。我们需要继续往后跟踪代码。
细心的同学可能已经发现了,在上面的Expr类的UML图中,有一个computeEvalCost方法,而这个方法就是计算每一个expr执行所需要的cost。我们以user_udf(info, 'type') = 'IOS'这个过滤条件为例,这个过滤条件在转换成对应的predicates时,相应的类如下:
代码语言:javascript复制// BinaryPredicate.java
protected float computeEvalCost() {
if (!hasChildCosts()) return UNKNOWN_COST;
if (getChild(0).getType().isFixedLengthType()) {
return getChildCosts() BINARY_PREDICATE_COST;
} else if (getChild(0).getType().isStringType()) {
return getChildCosts()
(float) (getAvgStringLength(getChild(0)) getAvgStringLength(getChild(1))) *
BINARY_PREDICATE_COST;
} else {
return getChildCosts() VAR_LEN_BINARY_PREDICATE_COST;
}
}
// FunctionCallExpr.java
protected float computeEvalCost() {
return hasChildCosts() ? getChildCosts() FUNCTION_CALL_COST : UNKNOWN_COST;
}
// LiterExpr.java
protected float computeEvalCost() {
return LITERAL_COST;
}
每一个predicate的cost都等于子predicate的cost和再加上一个常量,我们根据以上的计算方法,可以得出user_udf(info, 'type') = 'IOS'这个过滤条件对应的cost为:(10 (1 1) 1) (5 3)*1=21。具体的计算过程可以参考impala的源码,上面涉及到的各个常量的值如下所示:
代码语言:javascript复制// Expr.java
// The relative costs of different Exprs. These numbers are not intended as a precise
// reflection of running times, but as simple heuristics for ordering Exprs from cheap
// to expensive.
// TODO(tmwarshall): Get these costs in a more principled way, eg. with a benchmark.
public final static float ARITHMETIC_OP_COST = 1;
public final static float BINARY_PREDICATE_COST = 1;
public final static float VAR_LEN_BINARY_PREDICATE_COST = 5;
public final static float CAST_COST = 1;
public final static float COMPOUND_PREDICATE_COST = 1;
public final static float FUNCTION_CALL_COST = 10;
public final static float IS_NOT_EMPTY_COST = 1;
public final static float IS_NULL_COST = 1;
public final static float LIKE_COST = 10;
public final static float LITERAL_COST = 1;
public final static float SLOT_REF_COST = 1;
public final static float TIMESTAMP_ARITHMETIC_COST = 5;
public final static float UNKNOWN_COST = -1;
在将SQL进行解析之后,Impala会将这些信息转换成对应的执行计划。其中,对表的扫描就转换成了SCAN HDFS节点(如果是kudu表的话,则是SCAN KUDU),在生成HdfsScanNode的时候就会根据对各个predicate按照cost进行重新排序,如下所示:
代码语言:javascript复制// HdfsScanNode.java
/**
* Populate collectionConjuncts_ and scanRanges_.
*/
@Override
public void init(Analyzer analyzer) throws ImpalaException {
conjuncts_ = orderConjunctsByCost(conjuncts_);
// 省略余下代码
……
}
最后在实际进行扫描的时候,是根据这个排序之后的predicates进行比较的。这里的orderConjunctsByCost方法不仅仅用在HdfsScanNode,其他的相关节点也会用到,这里就不再赘述。
小结
通过以上的代码学习,我们终于知道了:为什么最开始的SQL,我们调整了where中过滤条件的顺序,并不能改变执行计划中的predicates顺序。这是Impala本身的一种优化措施,除此之外,Impala还有其他的很多知识需要慢慢学习,这只是其中的冰山一角。后续,有其他的知识,我也会继续跟大家一起分享。