impala be query plan 3 prepare->open->close

2022-09-28 16:54:47 浏览数 (1)

知识点 ControlServerice

QueryState

为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。

FragmentInstanceState

FragmentInstanceState处理单个计划片段实例执行的所有方面,包括成功和错误情况下的设置和终结。Close()在Exec()结束时自动发生,释放为此片段实例分配的所有内存,并关闭所有数据流。

堆栈

代码语言:javascript复制
ControlService::ControlService(MetricGroup* metric_group)
  this->ExecQueryFInstances(static_cast<const ExecQueryFInstancesRequestPB*>(req),
      void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context)
          const Status& fragment_info_sidecar_status = GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
          Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, query_ctx, fragment_info);
              QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
                        Status status = qs->Init(request, fragment_info);
                            这里主要为初始化
                            TExecPlanFragmentInfo by fragemtn_ifno 
                        unique_ptr<Thread> t;
                        status = Thread::Create("query-exec-mgr",Substitute("query-state-$0", PrintId(query_id)), &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
                            bool QueryState::StartFInstances() 
​
​
​
bool QueryState::StartFInstances();
    start_finstances_status = FragmentState::CreateFragmentStateMap(fragment_info_, exec_rpc_params_, this, fragment_state_map_)
        for(fragment_size)
            //根据instance_size 创建分布式fragment, 即单个查询拆分为多个fragment
            FragmentState* fragment_state = state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
        for(fragment_size)
            fragment_state->init();
                Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root)
                    Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root); (递归创建)
                    Status PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root)
                        const TPlanNode& tnode = tnodes[*node_idx];
                        int num_children = tnode.num_children;
                        RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));(创建PlanNode)
                            *node = pool->Add(new ScanPlanNode());/PartitionedHashJoinPlanNode/
                        for(num_children)
                            CreateTreeHelper(state, tnodes, node, node_idx, nullptr)); 递归
                        RETURN_IF_ERROR(node->Init(tnode, state));
              Status  = HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state)(这里可以了解下impala ShardedState 的概念,将同一个be节点不同的 scanode 放到了一个队列来处理)
​
    for (auto& fragment : fragment_state_map_) {
        FragmentState* fragment_state = fragment.second;
        for (int i = 0; i < fragment_state->instance_ctxs().size();   i)
            //创建 FragmentInstanceState
            FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx, *instance_ctx_pb));
            fis_map_.emplace(fis->instance_id(), fis);
            unique_ptr<Thread> t;
            //执行单个 FragmentInstanceState ExecFInstance 
            Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,[this, fis]() { this->ExecFInstance(fis); }, &t, true);
            void QueryState::ExecFInstance(FragmentInstanceState* fis)
                Status status = fis->Exec();(Status FragmentInstanceState::Exec())
                    Status status = Prepare();
                    status = Open();
                    Close();
​
​
Status FragmentInstanceState::Prepare()
    runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
        Init();
            resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
            instance_mem_tracker_ = obj_pool()->Add(new MemTracker(runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
    runtime_state_->resource_pool()->AcquireThreadToken();(获取一个线程资源)
    const PlanNode* plan_tree = fragment_state_->plan_tree();
    //ExecNode 执行节点,根据 PlanNode 创建ExecNode 
    RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
        RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));(这里举例一个PartitionedHashJoinNode)
            ObjectPool* pool = state->obj_pool();
            *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));(Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) ScanNode 也是同理)
        for (auto& child : plan_node.children_) { 递归创建子节点的ExecNode 
            ExecNode* child_node;
            RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
            DCHECK(child_node != nullptr);
            (*root)->children_.push_back(child_node);
        }
    //当前 Fragement Instance State ExecNode 创建完成
    //1 ExchangeNode
    // set #senders of exchange nodes before calling Prepare()
    vector<ExecNode*> exch_nodes;
    exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
    //2  scanNode 
    vector<ExecNode*> scan_nodes;
    ScanRangesPB no_scan_ranges;
    exec_tree_->CollectScanNodes(&scan_nodes);
        static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
    
      
    //3
    RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); //Status ExecNode::Prepare(RuntimeState* state) 
        mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
        for (int i = 0; i < children_.size();   i) {
            RETURN_IF_ERROR(children_[i]->Prepare(state));
        }
            //Status HdfsScanNodeMt::Prepare(RuntimeState* state) 
        
    //4 prepare sink_
    const DataSinkConfig* sink_config = fragment_state_->sink_config();
    DCHECK(sink_config != nullptr);
    sink_ = sink_config->CreateSink(runtime_state_);
    RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
​
    //5 row batch 数据
    row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),runtime_state_->instance_mem_tracker()));
​
​
Status FragmentInstanceState::Open() 
    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
    return sink_->Open(runtime_state_);
​
​
void FragmentInstanceState::Close()
    for (int i = 0; i < children_.size();   i) {
        children_[i]->Close(state);
    }

认识一个下FragmentInstanceState

0 人点赞