一 基础知识
Impala(be->fe->be)(这里讲解不够细节后面详细展开,这里不赘述)
Be->Coordinator(Fe->be)->Be Executor->Coordinator
1 PlanNode
(ExchangeNode 、 ScanNode、HashJoinNode 、AggregationNode)
代码语言:javascript复制ScanNode
ExchangeNode
HashJoinNode
AggregationNode
...
2 PlanFragemnt
单个PlanFragement 可独立执行,虽PlanFragement 之间有依赖,Impala并发也是以Fragement 做并发
二 调用栈
代码语言:javascript复制impala fe 接受 be 请求
public byte[] createExecRequest(byte[] thriftQueryContext)
TExecRequest result = frontend_.createExecRequest(planCtx);
TExecRequest result = getTExecRequest(planCtx, timeline);
TExecRequest req = doCreateExecRequest(planCtx, timeline);
TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
TQueryExecRequest queryExecRequest = getPlannedExecRequest(planCtx, analysisResult, timeline);
TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx);
List<PlanFragment> planRoots = planner.createPlans();(这里便是查询计划的生成)
result.setQuery_exec_request(queryExecRequest);
public List<PlanFragment> createPlans() throws ImpalaException {
createPlans
List<PlanFragment> distrPlan = createPlanFragments();
createPlanFragments
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
createSingleNodePlan
createQueryPlan
createSelectPlan
createCheapestJoinPlan
createJoinPlan(获得最优的SingleNodePlan,CBO)
根据singleNodePlan 生成分布式fragemnts
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
createPlanFragments(createPlanFragments 递归)
createHashJoinFragment/createScanFragment(这里为关联的节点构建ExchangeNode)
rootFragment.verifyTreeCtx(ctx_);
(这里有一些比较关键的逻辑
1. ExchangeNode.size == ChildrenNode.size
2. collectPlanNodes 获取当前PlanNode 及Children的所有Node
ParallelPlanner planner = new ParallelPlanner(ctx_);
List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
createPlans
createBuildPlans(root, null);
createBuildPlans
1. collectJoins()
2. list join join and createBuildPlan (create Join build by ExChangeNode)
3. list children and createBuildPlan(children)(遍历子节点的)
三 重要调用栈
createJoinPlan
创建非并发的PlanNode
代码语言:javascript复制computeParentAndSubplanRefs 统计所有表及表相相关的查询信息,比如join
parentRefPlans 存储了所有表的信息,保存在remainingRefs,createJoinPlan 会遍历所有的表,根据 candidate 排序,选择最大的一个表作为根
// consumption of the materialized hash tables required for the join sequence
Collections.sort(candidates,
new Comparator<Pair<TableRef, Long>>() {
@Override
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
long diff = b.second - a.second;
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
}
});
遍历跟当前表所有相关的表,计算他们相关的计算比如HashJoin 的代价 Cardinality,选择最小的Cardinality 作为优先的Join 顺序.遍历完成当前表,则进入下一张表,从remainingRefs 剔除已经计算过的表.直至从remainingRefs 为空,死循环结束
while(true)
{
getMinCardinary();
remove(minEntry)
}
createHashJoinFragment
创建 Hash Join Fragement
代码语言:javascript复制
1. broadcast join
2. shuffle join
执行流程
1. 获取左子树 lhsTree getCardinality
2. 获取右子树 rhsTree getCardinality
3. 推测join 算法,构建Exchange Node,
4. 在 lhsTree, rhsTree 选择Cardinality大的问题左子树,即根节点
5. Partiton Hash Join
(推测表是否为Partiton 分区,优化查询计划)
verifyTreeCtx
检查PlanNode 的完整性 ,主要检查ExchangNode 与Children Node 的数量
代码语言:javascript复制1. coll public List<PlanNode> collectPlanNodes() {
List<PlanNode> nodes = new ArrayList<>();
collectPlanNodesHelper(planRoot_, Predicates.alwaysTrue(), nodes);
return nodes;
}
createBuildPlans
根据ExchangeNode 创建Join Builder
代码语言:javascript复制 */
private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) {
List<JoinNode> joins = new ArrayList<>();
collectJoins(fragment.getPlanRoot(), joins);
if (!joins.isEmpty()) {
if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
for (JoinNode join: joins) createBuildPlan(join, buildCohortId);
}
for (PlanFragment child: fragment.getChildren()) {
// We already recursed on the join build fragment in createBuildPlan().
if (child.getSink() instanceof JoinBuildSink) continue;
// Propagate the plan and cohort IDs to children that are part of the same plan.
child.setPlanId(fragment.getPlanId());
child.setCohortId(fragment.getCohortId());
createBuildPlans(child, buildCohortId);
}
}
下一篇 讲impala be query plan