AdmissionController
AdmissionController 用于根据在一个或多个资源池中配置的可用集群资源限制请求(例如查询、DML)。请求将被允许立即执行、排队等待稍后执行或拒绝(立即或排队后)。资源池可以配置为具有最大并发查询数、最大集群范围内存、最大队列大小、每个查询的最大和最小每主机内存限制,并设置mem_limit查询选项是否会被前面提到的最大/最小每主机限制限制限制。如果执行的查询太多或可用内存不足,查询将排队。一旦队列达到最大队列大小,传入的查询将被拒绝。队列中的请求将在可配置的超时后超时。
代码语言:javascript复制void ClientRequestState::FinishExecQueryOrDmlRequest()
Status LocalAdmissionControlClient::SubmitForAdmission(const AdmissionController::AdmissionRequest& request,RuntimeProfile::EventSequence* query_events,std::unique_ptr<QuerySchedulePB>* schedule_result)
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,std::string* request_pool)
bool AdmissionController::FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node,bool& coordinator_resource_limited)
Status AdmissionController::ComputeGroupScheduleStates(ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node)
for (const ExecutorGroup* executor_group : executor_groups) {
RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get()));
Status Scheduler::Schedule(const ExecutorConfig& executor_config, ScheduleState* state) {
RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
ComputeFragmentExecParams(executor_config, state);
ComputeBackendExecParams(executor_config, state);
#ifndef NDEBUG
state->Validate();
#endif
state->set_executor_group(executor_config.group.name());
return Status::OK();
}
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config, ScheduleState* state)
for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
// set instance_id, host, per_node_scan_ranges
ComputeFragmentExecParams(executor_config, plan_exec_info,state->GetFragmentScheduleState(plan_exec_info.fragments[0].idx), state);
//void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,const TPlanExecInfo& plan_exec_info, FragmentScheduleState* fragment_state,ScheduleState* state)
void Scheduler::CreateCollocatedJoinBuildInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state)
if (fragment.plan.nodes.size() == 1 && fragment.plan.nodes.at(0).__isset.hdfs_scan_node
&& fragment.plan.nodes.at(0).hdfs_scan_node.is_colocate_scan) {
AddJoinBuildScanInstances(executor_config, fragment_state, state, join_fragment_state);
PrintAssgimentScanRange(fragment_state, join_fragment_state);
}
void Scheduler::AddJoinBuildScanInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state,FragmentScheduleState* join_fragment_state)
vector<TPlanNodeId> scan_node_ids = FindScanNodes(fragment.plan);DCHECK(scan_node_ids.size() == 1);
for (const auto& parent_state : join_fragment_state->instance_states) {
FInstanceScheduleState& instance_state = fragment_state->instance_states[build_instance_idx ];
instance_state.AddScanRanges(scan_node_id, instance_ranges);
}
// Set destinations, per_exch_num_senders, sender_id.
for (const TPlanFragment& src_fragment : plan_exec_info.fragments)
{
}
}
void ScheduleState::Validate()
coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
Status exec_status = coord_->Exec();
Status Coordinator::Exec() {
const TQueryExecRequest& request = exec_params_.query_exec_request();
DCHECK(request.plan_exec_info.size() > 0);
VLOG_QUERY << "Exec() query_id=" << PrintId(query_id())
<< " stmt=" << request.query_ctx.client_request.stmt;
stmt_type_ = request.stmt_type;
query_profile_ =
RuntimeProfile::Create(obj_pool(), "Execution Profile " PrintId(query_id()));
finalization_timer_ = PROFILE_FinalizationTimer.Instantiate(query_profile_);
filter_updates_received_ = PROFILE_FiltersReceived.Instantiate(query_profile_);
host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
query_profile_->AddChild(host_profiles_);
SCOPED_TIMER(query_profile_->total_time_counter());
// initialize progress updater
const string& str = Substitute("Query $0", PrintId(query_id()));
progress_.Init(str, exec_params_.query_schedule().num_scan_ranges());
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
InitFragmentStats();
// create BackendStates and per-instance state, including profiles, and install
// the latter in the FragmentStats' root profile
InitBackendStates();
RETURN_IF_ERROR(StartBackendExec());
RETURN_IF_ERROR(FinishBackendStartup());
return Status::OK();
}