Impala be query plan2 - AdmissionController

2022-09-28 16:50:26 浏览数 (1)

AdmissionController

AdmissionController 用于根据在一个或多个资源池中配置的可用集群资源限制请求(例如查询、DML)。请求将被允许立即执行、排队等待稍后执行或拒绝(立即或排队后)。资源池可以配置为具有最大并发查询数、最大集群范围内存、最大队列大小、每个查询的最大和最小每主机内存限制,并设置mem_limit查询选项是否会被前面提到的最大/最小每主机限制限制限制。如果执行的查询太多或可用内存不足,查询将排队。一旦队列达到最大队列大小,传入的查询将被拒绝。队列中的请求将在可配置的超时后超时。

代码语言:javascript复制
void ClientRequestState::FinishExecQueryOrDmlRequest()
    Status LocalAdmissionControlClient::SubmitForAdmission(const AdmissionController::AdmissionRequest& request,RuntimeProfile::EventSequence* query_events,std::unique_ptr<QuerySchedulePB>* schedule_result)
        Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,std::string* request_pool)
            bool AdmissionController::FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node,bool& coordinator_resource_limited)
                Status AdmissionController::ComputeGroupScheduleStates(ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node)
                    for (const ExecutorGroup* executor_group : executor_groups) {
                        RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get()));
​

Status Scheduler::Schedule(const ExecutorConfig& executor_config, ScheduleState* state) {
  RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
  RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
  ComputeFragmentExecParams(executor_config, state);
  ComputeBackendExecParams(executor_config, state);
#ifndef NDEBUG
  state->Validate();
#endif
  state->set_executor_group(executor_config.group.name());
  return Status::OK();
}
​
​
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
  for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
    for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
​
​
​
void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config, ScheduleState* state)
    for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
        // set instance_id, host, per_node_scan_ranges
        ComputeFragmentExecParams(executor_config, plan_exec_info,state->GetFragmentScheduleState(plan_exec_info.fragments[0].idx), state);
        //void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,const TPlanExecInfo& plan_exec_info, FragmentScheduleState* fragment_state,ScheduleState* state)
            void Scheduler::CreateCollocatedJoinBuildInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state)
                if (fragment.plan.nodes.size() == 1 && fragment.plan.nodes.at(0).__isset.hdfs_scan_node
                    && fragment.plan.nodes.at(0).hdfs_scan_node.is_colocate_scan) {
                    AddJoinBuildScanInstances(executor_config, fragment_state, state, join_fragment_state);
                    PrintAssgimentScanRange(fragment_state, join_fragment_state);
                }
                
                void Scheduler::AddJoinBuildScanInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state,FragmentScheduleState* join_fragment_state)
                    vector<TPlanNodeId> scan_node_ids = FindScanNodes(fragment.plan);DCHECK(scan_node_ids.size() == 1);
                    for (const auto& parent_state : join_fragment_state->instance_states) {
                        FInstanceScheduleState& instance_state = fragment_state->instance_states[build_instance_idx  ];
                       instance_state.AddScanRanges(scan_node_id, instance_ranges);
                    }
        // Set destinations, per_exch_num_senders, sender_id.
        for (const TPlanFragment& src_fragment : plan_exec_info.fragments)
        {
​
        }
    }
​
void ScheduleState::Validate() 
​
    coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
    Status exec_status = coord_->Exec();
        Status Coordinator::Exec() {
        const TQueryExecRequest& request = exec_params_.query_exec_request();
        DCHECK(request.plan_exec_info.size() > 0);
​
        VLOG_QUERY << "Exec() query_id=" << PrintId(query_id())
                    << " stmt=" << request.query_ctx.client_request.stmt;
        stmt_type_ = request.stmt_type;
​
        query_profile_ =
            RuntimeProfile::Create(obj_pool(), "Execution Profile "   PrintId(query_id()));
        finalization_timer_ = PROFILE_FinalizationTimer.Instantiate(query_profile_);
        filter_updates_received_ = PROFILE_FiltersReceived.Instantiate(query_profile_);
​
        host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
        query_profile_->AddChild(host_profiles_);
​
        SCOPED_TIMER(query_profile_->total_time_counter());
​
        // initialize progress updater
        const string& str = Substitute("Query $0", PrintId(query_id()));
        progress_.Init(str, exec_params_.query_schedule().num_scan_ranges());
​
        query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
            query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
        filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
            -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
​
        InitFragmentStats();
        // create BackendStates and per-instance state, including profiles, and install
        // the latter in the FragmentStats' root profile
        InitBackendStates();
        RETURN_IF_ERROR(StartBackendExec());
        RETURN_IF_ERROR(FinishBackendStartup());
        return Status::OK();
        }

0 人点赞