知识点 ControlServerice
QueryState
为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。
FragmentInstanceState
FragmentInstanceState处理单个计划片段实例执行的所有方面,包括成功和错误情况下的设置和终结。Close()在Exec()结束时自动发生,释放为此片段实例分配的所有内存,并关闭所有数据流。
堆栈
代码语言:javascript复制ControlService::ControlService(MetricGroup* metric_group)
this->ExecQueryFInstances(static_cast<const ExecQueryFInstancesRequestPB*>(req),
void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context)
const Status& fragment_info_sidecar_status = GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, query_ctx, fragment_info);
QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
Status status = qs->Init(request, fragment_info);
这里主要为初始化
TExecPlanFragmentInfo by fragemtn_ifno
unique_ptr<Thread> t;
status = Thread::Create("query-exec-mgr",Substitute("query-state-$0", PrintId(query_id)), &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
bool QueryState::StartFInstances()
bool QueryState::StartFInstances();
start_finstances_status = FragmentState::CreateFragmentStateMap(fragment_info_, exec_rpc_params_, this, fragment_state_map_)
for(fragment_size)
//根据instance_size 创建分布式fragment, 即单个查询拆分为多个fragment
FragmentState* fragment_state = state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
for(fragment_size)
fragment_state->init();
Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root)
Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root); (递归创建)
Status PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root)
const TPlanNode& tnode = tnodes[*node_idx];
int num_children = tnode.num_children;
RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));(创建PlanNode)
*node = pool->Add(new ScanPlanNode());/PartitionedHashJoinPlanNode/
for(num_children)
CreateTreeHelper(state, tnodes, node, node_idx, nullptr)); 递归
RETURN_IF_ERROR(node->Init(tnode, state));
Status = HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state)(这里可以了解下impala ShardedState 的概念,将同一个be节点不同的 scanode 放到了一个队列来处理)
for (auto& fragment : fragment_state_map_) {
FragmentState* fragment_state = fragment.second;
for (int i = 0; i < fragment_state->instance_ctxs().size(); i)
//创建 FragmentInstanceState
FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx, *instance_ctx_pb));
fis_map_.emplace(fis->instance_id(), fis);
unique_ptr<Thread> t;
//执行单个 FragmentInstanceState ExecFInstance
Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,[this, fis]() { this->ExecFInstance(fis); }, &t, true);
void QueryState::ExecFInstance(FragmentInstanceState* fis)
Status status = fis->Exec();(Status FragmentInstanceState::Exec())
Status status = Prepare();
status = Open();
Close();
Status FragmentInstanceState::Prepare()
runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
Init();
resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
instance_mem_tracker_ = obj_pool()->Add(new MemTracker(runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
runtime_state_->resource_pool()->AcquireThreadToken();(获取一个线程资源)
const PlanNode* plan_tree = fragment_state_->plan_tree();
//ExecNode 执行节点,根据 PlanNode 创建ExecNode
RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));(这里举例一个PartitionedHashJoinNode)
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));(Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) ScanNode 也是同理)
for (auto& child : plan_node.children_) { 递归创建子节点的ExecNode
ExecNode* child_node;
RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
DCHECK(child_node != nullptr);
(*root)->children_.push_back(child_node);
}
//当前 Fragement Instance State ExecNode 创建完成
//1 ExchangeNode
// set #senders of exchange nodes before calling Prepare()
vector<ExecNode*> exch_nodes;
exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
//2 scanNode
vector<ExecNode*> scan_nodes;
ScanRangesPB no_scan_ranges;
exec_tree_->CollectScanNodes(&scan_nodes);
static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
//3
RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); //Status ExecNode::Prepare(RuntimeState* state)
mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
for (int i = 0; i < children_.size(); i) {
RETURN_IF_ERROR(children_[i]->Prepare(state));
}
//Status HdfsScanNodeMt::Prepare(RuntimeState* state)
//4 prepare sink_
const DataSinkConfig* sink_config = fragment_state_->sink_config();
DCHECK(sink_config != nullptr);
sink_ = sink_config->CreateSink(runtime_state_);
RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
//5 row batch 数据
row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),runtime_state_->instance_mem_tracker()));
Status FragmentInstanceState::Open()
RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
return sink_->Open(runtime_state_);
void FragmentInstanceState::Close()
for (int i = 0; i < children_.size(); i) {
children_[i]->Close(state);
}