Impala Plan Schedule

2024-09-23 21:18:26 浏览数 (2)

一 基础知识

数据库中一个逻辑查询计划生成后, 需要进行ToPhysical Plan 转化为物理的查询计划, 本文主要讲解 Scan算子 是怎么初始化和分发的, 其他算子可类推

Impala 中, 逻辑的查询计划是在Impala Fe中生成, 并携带在Plan Request 中, 交由BE Admission Control 来处理生成物理的查询计划, 分为以下几个步骤

1 FE 生成计划过程中, 首先因为Hash Join 等算子, 产生的Join Probe 和Join Builder 算子, 即一个Plan 可能生成多个Plan Fragment

2 BE 过程中, 因为是MPP 数据, 需要将一个Plan Fragment 按照 Executor(本文中为Backend)来做合理的拆分, 该步骤步骤主要在Impala Scheduler::Schedule CompactScanRangeAssignment 完成, 进行ScanRange和Backend 的绑定工作

3 BE 过程中, 因为是多线程并发的模型, 需要将分发给同一个Backend的 单个算子进行并行化处理, 即FragmentInstance 进行初始化 改步骤主要在 Scheduler::Schedule ComputeFragmentExecParams 来初始化

4 将初始化完成的Instance 二次完成 BackenScheudeState

5 在每个Executor 内部都有一个ControlService KRPC( 来接收Cooridinator 分发的任务ExecQueryFInstancesRequestPB)

....

本文先将这些, 可参考之前文档, 可关注后续文档

二 Common

代码语言:javascript复制
syntax="proto2";
 
package impala;
​
message UniqueIdPB {  //Query Id
  required fixed64 hi = 1;
  required fixed64 lo = 2;
}
QueryId
FragmentInstanceId
BackendId
SessionId 
RegistrationId
​
message ColumnValuePB {
  optional bool bool_val = 1;
  optional int32 byte_val = 6;
  optional int32 short_val = 7;
  optional int32 int_val = 2;
  optional int64 long_val = 3;
  optional double double_val = 4;
  optional string string_val = 5;
  optional string binary_val = 8;
  optional string timestamp_val = 9;
  optional bytes decimal_val = 10;
  optional int32 date_val = 11;
}
​
message NetworkAddressPB {
  required string ip/host = 1;
  required int32 port = 2;
}
​
PlanNodeId int32
InstanceId int32
FragmentIdx int32

三 Level0 ScanRange

BE

代码语言:javascript复制
// Specification of an individual data range which is held in its entirety by a storage
// server. Corresponds to TScanRange and should be kept in sync with it.
message ScanRangePB {
  // One of these must be set for every ScanRangePB.
  optional HdfsFileSplitPB hdfs_file_split = 1;
  optional HBaseKeyRangePB hbase_key_range = 2;
  optional KuduTabletDescPB kudu_tablet_desc = 3;
  optional ExternalTableSplitPB external_table_split_desc = 4;
  optional bytes file_metadata = 5;
}
​
​
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
  optional ScanRangePB scan_range = 1;
  optional int32 volume_id = 2 [default = -1];
  optional bool try_hdfs_cache = 3 [default = false];
  optional bool is_remote = 4;
}
         
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {                                                                                                                     
  repeated ScanRangeParamsPB scan_ranges = 1;
}

FE

代码语言:javascript复制
// Specification of a subsection of a single HDFS file. Corresponds to HdfsFileSpiltPB and
// should be kept in sync with it.
struct THdfsFileSplit {
  // File name (not the full path).  The path is assumed to be relative to the
  // 'location' of the THdfsPartition referenced by partition_id.
  1: required string relative_path
        
  // starting offset
  2: required i64 offset
        
  // length of split
  3: required i64 length
        
  // ID of partition within the THdfsTable associated with this scan node.
  4: required i64 partition_id
        
  // total size of the hdfs file
  5: required i64 file_length
        
  // compression type of the hdfs file
  6: required CatalogObjects.THdfsCompression file_compression
        
  // last modified time of the file
  7: required i64 mtime
        
  // Hash of the partition's path. This must be hashed with a hash algorithm that is
  // consistent across different processes and machines. This is currently using
  // Java's String.hashCode(), which is consistent. For testing purposes, this can use
  // any consistent hash.
  9: required i32 partition_path_hash
        
  // The absolute path of the file, it's used only when data files are outside of
  // the Iceberg table location (IMPALA-11507).
  10: optional string absolute_path
        
  // Whether the HDFS file is stored with erasure coding. 
  11: optional bool is_erasure_coded
}
​
​
struct TScanRange {
  // one of these must be set for every TScanRange
  1: optional THdfsFileSplit hdfs_file_split                  
  //4: optional TExternalTableDesc external_table_desc
  5: optional binary file_metadata
} 
​
​
​
// location information for a single scan range
struct TScanRangeLocation {
  // Index into TQueryExecRequest.host_list.
  1: required i32 host_idx;
            
  // disk volume identifier of a particular scan range at 'server';
  // -1 indicates an unknown volume id; 
  // only set for TScanRange.hdfs_file_split
  2: optional i32 volume_id = -1
            
  // If true, this block is cached on this server.
  3: optional bool is_cached = false
}                                                                                                                                  
 
// A single scan range plus the hosts that serve it
struct TScanRangeLocationList {
  1: required PlanNodes.TScanRange scan_range
  // non-empty list     
  2: list<TScanRangeLocation> locations
}
​
// A specification for scan ranges. Scan ranges can be
// concrete or specs, which are used to generate concrete ranges.
// Each type is stored in a separate list.
struct TScanRangeSpec { 
   1: optional list<TScanRangeLocationList> concrete_ranges
   2: optional list<PlanNodes.TFileSplitGeneratorSpec> split_specs
} 

FE->BE

代码语言:javascript复制
void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_range_pb) {
  if (tscan_range.__isset.hdfs_file_split) {
    //scan_range_pb->mutable_hdfs_file_split()
    HdfsFileSplitPB* hdfs_file_split = scan_range_pb->mutable_hdfs_file_split();
    hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
    hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
    hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
    hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
    hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
    hdfs_file_split->set_file_compression(
        THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
    hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
    hdfs_file_split->set_partition_path_hash(
        tscan_range.hdfs_file_split.partition_path_hash);
    hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
    if (tscan_range.hdfs_file_split.__isset.absolute_path) {
      hdfs_file_split->set_absolute_path(
          tscan_range.hdfs_file_split.absolute_path);
    }
    
    if (tscan_range.hdfs_file_split.__isset.delete_delta_files) {
      for (auto & dd_file : tscan_range.hdfs_file_split.delete_delta_files) {
        auto dd_file_pb  = hdfs_file_split->mutable_delete_delta_files()->Add();
        dd_file_pb->set_relative_path(dd_file.relative_path);
        dd_file_pb->set_file_length(dd_file.file_length);
        dd_file_pb->set_mtime(dd_file.mtime);
      }
    }
  }
​
  if (tscan_range.__isset.external_table_desc) {
    ExternalTableSplitPB* external_table_split_pb =
        scan_range_pb->mutable_external_table_split_desc();
    external_table_split_pb->set_sql(tscan_range.external_table_desc.sql);
    external_table_split_pb->set_host(tscan_range.external_table_desc.host);
    external_table_split_pb->set_port(tscan_range.external_table_desc.port);
  }
​
  if (tscan_range.__isset.file_metadata) {
    scan_range_pb->set_file_metadata(tscan_range.file_metadata);
  }
}
​
//By Scheduler::AssignmentCtx::RecordScanRangeAssignment(

四 Level1 FragmentInstanceScheduleState

也是AdminssionControl 传输的单位 FragmentInstanceRequest

代码语言:javascript复制
// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
  // The join node id that will consume this join build.
  optional int32 join_node_id = 1;
 
  // Fragment instance id of the input fragment instance.
  optional UniqueIdPB input_finstance_id = 2;
}
​
message FragmentInstanceExecParamsPB {              
  // The fragment instance id.               
  optional UniqueIdPB instance_id = 1; 
  //Fragment Instance ID
                                             
  // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
  optional int32 fragment_idx = 2;           
                                                
  // Map from plan node id to a list of scan ranges.
  map<int32, ScanRangesPB> per_node_scan_ranges = 5;                                                                                       
                                             
  // 0-based ordinal number of this particular instance. This is within its fragment, not
  // query-wide, so eg. there will be one instance '0' for each fragment.
  optional int32 per_fragment_instance_idx = 6;
                                    
  // In its role as a data sender, a fragment instance is assigned a "sender id" to
  // uniquely identify it to a receiver. -1 = invalid.
  optional int32 sender_id = 7 [default = -1];
                                             
  // List of input join build finstances for joins in this finstance.//FragmentInstanceExecParamsPB 最多一个JoinBuild
  repeated JoinBuildInputPB join_build_inputs = 8;
                                             
  // If this is a join build fragment, the number of fragment instances that consume the
  // join build. -1 = invalid.               
  optional int32 num_join_build_outputs = 9 [default = -1];
}
​
​
/// Execution parameters for a single fragment instance. Contains both intermediate
/// info needed by the scheduler and info that will be sent back to the coordinator.
///
/// FInstanceScheduleStates are created as children of FragmentScheduleStates (in
/// 'instance_states') and then the calculated execution parameters, 'exec_params', are
/// transferred to the corresponding BackendExecParamsPB in
/// Scheduler::ComputeBackendExecParams().
struct FragmentInstanceScheduleState {
  NetworkAddressPB address;
  /// Contains any info that needs to be sent back to the coordinator. Computed during
  /// Scheduler::ComputeFragmentExecParams() then transferred to the corresponding
  /// BackendExecParamsPB in Scheduler::ComputeBackendExecParams(), after which it
  /// is no longer valid to access.
  FragmentInstanceExecParamsPB exec_params;//1:1 
  
  //SomeMethod For manger FragmentInstanceExecParamasPB
  FInstanceScheduleState(const UniqueIdPB& instance_id, const NetworkAddressPB& host,
      const NetworkAddressPB& krpc_host, int per_fragment_instance_idx,
      const FragmentScheduleState& fragment_state);
​
  /// Adds the ranges in 'scan_ranges' to the scan at 'scan_idx' in 'exec_params'.
  void AddScanRanges(int scan_idx, const std::vector<ScanRangeParamsPB>& scan_ranges); //addScanRangeS For InStance 
  //By ComputeFragmentExecParams
  /* at FragmentScheduleState
   if (!fragment_state->scan_range_assignment.empty()) {
      DCHECK_EQ(fragment_state->scan_range_assignment.size(), 1);
      auto first_entry = fragment_state->scan_range_assignment.begin();
      for (const PerNodeScanRanges::value_type& entry : first_entry->second) {
        instance_state.AddScanRanges(entry.first, entry.second);
      }
    }
  */
}
代码语言:javascript复制
// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {                                                                                                                                 
  // The globally unique fragment instance id.
  optional UniqueIdPB fragment_instance_id = 1;
         
  // ip   port of the KRPC backend service on the destination.
  optional NetworkAddressPB address = 2;
}  
​
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {                                                                                                                                         
  // Ordinal number of corresponding fragment in the query.
  optional int32 fragment_idx = 1;
         
  // Output destinations, one per output partition. The partitioning of the output is
  // specified by TPlanFragment.output_sink.output_partition in the corresponding
  // TPlanFragment. The number of output partitions is destinations.size().
  repeated PlanFragmentDestinationPB destinations = 2;
}        
​
​
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
  // Ordinal number of corresponding fragment in the query.
  optional int32 fragment_idx = 1;
    
  // Map from plan node id to initial scan ranges for each scan node in
  // TPlanFragment.plan_tree
  map<int32, ScanRangesPB> per_node_scan_ranges = 2;
    
  // List of input join build finstances for joins in this finstance.
  repeated JoinBuildInputPB join_build_inputs = 3;
}
​
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
  // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
  // the coordinator.
  optional int32 coord_state_idx = 1;
 
  // Sidecar index of the TQueryCtx.
  optional int32 query_ctx_sidecar_idx = 2;                                                                                                                                                          
  // Sidecar index of the TExecPlanFragmentInfo.
  optional int32 plan_fragment_info_sidecar_idx = 3;
 
  // The minimum query-wide memory reservation (in bytes) required for the backend
  // executing the instances in fragment_instance_ctxs. This is the peak minimum
  // reservation that may be required by the concurrently-executing operators at any
  // point in query execution. It may be less than the initial reservation total claims
  // (below) if execution of some operators never overlaps, which allows reuse of
  // reservations.
  optional int64 min_mem_reservation_bytes = 4;
 
  // Total of the initial buffer reservations that we expect to be claimed on this
  // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
  // operators in all fragment instances that execute on this backend. This is used for
  // an optimization in InitialReservation. Measured in bytes.
  optional int64 initial_mem_reservation_total_claims = 5;
 
  // The backend memory limit (in bytes) as set by the admission controller. Used by the
  // query mem tracker to enforce the memory limit.
  optional int64 per_backend_mem_limit = 6;
 
  // General execution parameters for different fragments. Corresponds to 'fragments' in
  // the TExecPlanFragmentInfo sidecar.
  repeated PlanFragmentCtxPB fragment_ctxs = 7;
 
  // Execution parameters for specific fragment instances. Corresponds to
  // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
  repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
}
 
message ExecQueryFInstancesResponsePB {
  // Success or failure of the operation.
  optional StatusPB status = 1;
}

五 Level2-1 FragmentScheduleState

1:1 FragmentScheduleState/TPlanFragment

1:n(FragmentInstnace)

代码语言:javascript复制
/// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<ScanRangeParamsPB>> PerNodeScanRanges;
​
/// map from an impalad host address to the per-node assigned scan ranges;
/// records scan range assignment for a single fragment
typedef std::unordered_map<NetworkAddressPB, PerNodeScanRanges>
    FragmentScanRangeAssignment;
​
​
/// Execution parameters shared between fragment instances. This struct is a container for
/// any intermediate data needed for scheduling that will not be sent back to the
/// coordinator as part of the QuerySchedulePB along with a pointer to the corresponding
/// FragmentExecParamsPB in the QuerySchedulePB.
struct FragmentScheduleState {
  /// Only needed as intermediate state during exec parameter computation.
  /// For scheduling, refer to FInstanceExecParamsPB.per_node_scan_ranges
  FragmentScanRangeAssignment scan_range_assignment; //Only For ScanNode Init //FragmentInstanceScheduleState  AddScanRanges By scan_range_assignment
​
  bool is_coord_fragment;
  const TPlanFragment& fragment;
​
  /// Fragments that are inputs to an ExchangeNode of this fragment.
  std::vector<FragmentIdx> exchange_input_fragments;
​
  /// Instances of this fragment. Instances on a backend are clustered together - i.e. all
  /// instances for a given backend will be consecutive entries in the vector. These have
  /// their protobuf params Swap()-ed to the BackendExecParamsPB during
  /// Scheduler::ComputeBackendExecParams() and are no longer valid after that.
  std::vector<FInstanceScheduleState> instance_states;
​
  /// Pointer to the corresponding FragmentExecParamsPB in the parent ScheduleState's
  /// 'query_schedule_pb_'
  FragmentExecParamsPB* exec_params;//Init 
​
  FragmentScheduleState(const TPlanFragment& fragment, FragmentExecParamsPB* exec_params);
  
  //Init By void ScheduleState::Init() {
  /*
  const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
  if (RequiresCoordinatorFragment()) {
    fragment_schedule_states_[root_fragment.idx].is_coord_fragment = true;
    // the coordinator instance gets index 0, generated instance ids start at 1
    next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
  }
  */
};

六 Level2-2 BackendScheduleState

代码语言:javascript复制
// Execution parameters for a single backend. Used to construct the
// Coordinator::BackendStates.
message BackendExecParamsPB {
  // The id of this backend.
  optional UniqueIdPB backend_id = 1;                                                                                                     
  
  // The hostname   port of the KRPC backend service on this backend.
  optional NetworkAddressPB address = 8;
            
  // The IP address   port of the KRPC backend service on this backend.
  optional NetworkAddressPB krpc_address = 9;
            
  // The fragment instance params assigned to this backend. All instances of a
  // particular fragment are contiguous in this list. This can be empty only for the
  // coordinator backend, that is, if 'is_coord_backend' is true.
  repeated FInstanceExecParamsPB instance_params = 2;
            
  // The minimum query-wide buffer reservation size (in bytes) required for this backend.
  // This is the peak minimum reservation that may be required by the
  // concurrently-executing operators at any point in query execution. It may be less
  // than the initial reservation total claims (below) if execution of some operators
  // never overlaps, which allows reuse of reservations.
  optional int64 min_mem_reservation_bytes = 3;
            
  // Total of the initial buffer reservations that we expect to be claimed on this
  // backend for all fragment instances in instance_params. I.e. the sum over all
  // operators in all fragment instances that execute on this backend. This is used for
  // an optimization in InitialReservation. Measured in bytes.
  optional int64 initial_mem_reservation_total_claims = 4;
            
  // Total thread reservation for fragment instances scheduled on this backend. This is
  // the peak number of required threads that may be required by the
  // concurrently-executing fragment instances at any point in query execution.
  optional int64 thread_reservation = 5;
            
  // Number of slots that this query should count for in admission control.
  // This is calculated as the maximum # of instances of any fragment on this backend.
  // I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
  // (but less if the query is not actually running with mt_dop instances on this node).
  optional int32 slots_to_use = 6;
            
  // Indicates whether this backend is the coordinator.
  optional bool is_coord_backend = 7;
}
​
/// Execution parameters for a single backend. This gets created for every backend that
/// participates in query execution, which includes every backend that has fragments
/// scheduled on it and the coordinator backend.
///
/// Created by ScheduleState::GetOrCreateBackendScheduleState() and initialized in
/// Scheduler::ComputeBackendExecParams(). Used as an input to the
/// AdmissionController and Coordinator::BackendState.
struct BackendScheduleState {
  BackendDescriptorPB be_desc;
​
  /// Pointer to the corresponding protobuf struct containing any parameters for this
  /// backend that will need to be sent back to the coordinator. Owned by
  /// ScheduleState::query_schedule_pb_.
  BackendExecParamsPB* exec_params; //BackendExecParamsPB
​
  explicit BackendScheduleState(BackendExecParamsPB* exec_params)
    : exec_params(exec_params) {}
};

七 Level3 ScheduleState

1:1 Query/Scheduler/Coordinator

1:n Fragment /Backend

Scheduler Setup3 Scheduler::ComputeBackendExecParams

代码语言:javascript复制
​
// Contains the output from scheduling and admission control that is used by the
// coordinator to start query execution.
message QuerySchedulePB {
  optional UniqueIdPB query_id = 1;
 
  // The per-fragment execution parameters for this schedule.
  repeated FragmentExecParamsPB fragment_exec_params = 2;
 
  // The per-backend execution parameters for this schedule.
  repeated BackendExecParamsPB backend_exec_params = 3; //Backend/Executor
 
  // Total number of scan ranges of this query.
  optional int64 num_scan_ranges = 4;
 
  // The memory limit per executor that will be imposed on the query.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. -1 means no limit.
  optional int64 per_backend_mem_limit = 5;
 
  // The per executor memory used for admission accounting.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. Can be zero if the query is only scheduled to run on the coordinator.
  optional int64 per_backend_mem_to_admit = 6;
 
  // The memory limit for the coordinator that will be imposed on the query. Used only if
  // the query has a coordinator fragment.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. -1 means no limit.
  optional int64 coord_backend_mem_limit = 7;
 
  // The coordinator memory used for admission accounting.
  // Set by the admission controller with a value that is only valid if it was admitted                                                                             
  // successfully.
  optional int64 coord_backend_mem_to_admit = 8;
}
​
/*
void ScheduleState::Init() {
  *query_schedule_pb_->mutable_query_id() = query_id_;
  // extract TPlanFragments and order by fragment idx
  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
      fragments_.emplace(fragment.idx, fragment);
    }
  }
​
  // this must only be called once
  DCHECK_EQ(fragment_schedule_states_.size(), 0);
  for (int i = 0; i < fragments_.size();   i) {
    auto it = fragments_.find(i);
    DCHECK(it != fragments_.end());
    fragment_schedule_states_.emplace_back(
        it->second, query_schedule_pb_->add_fragment_exec_params());
        //1 add_fragment_exec_params
        //2 add fragment_schedule_states_
  }*/
​
​
/// Map from an impalad backend address to the state for that backend.
typedef std::unordered_map<NetworkAddressPB, BackendScheduleState>
    PerBackendScheduleStates;
​
​
/// ScheduleState is a container class for scheduling data used by Scheduler and
/// AdmissionController, which perform the scheduling logic itself, and it is only
/// intended to be accessed by them. The information needed for the coordinator to begin
/// execution is stored in 'query_schedule_pb_', which is returned from the
/// AdmissionController on successful admission. Everything else is intermediate data
/// needed to calculate the schedule but is discarded after a scheduling decision is made.
///
/// The general usage pattern is:
/// - FragmentScheduleStates are created for each fragment in the plan. They are given
///   pointers to corresponding FragmentExecParamsPBs created in the QuerySchedulePB.
/// - FInstanceScheduleStates are created as children of the FragmentScheduleStates for
///   each finstance and assigned to hosts. The FInstanceScheduleStates each have a
///   corresponding FInstanceExecParamsPB that they initially own.
/// - The scheduler computes the BackendScheduleState for each backend that was assigned a
///   fragment instance (and the coordinator backend). They are given pointers to
///   corresponding BackendExecParamsPBs created in the QuerySchedulePB and the
///   FInstanceExecParamsPB are Swap()-ed into them.
/// - The ScheduleState is passed to the admission controller, which keeps updating the
///   memory requirements by calling UpdateMemoryRequirements() every time it tries to
///   admit the query and sets the final values once the query gets admitted successfully.
/// - On successful admission, the QuerySchedulePB is returned to the coordinator and
///   everything else is discarded.
class ScheduleState {
 public:
  /// For testing only: specify 'is_test=true' to build a ScheduleState object without
  /// running Init() and to seed the random number generator for deterministic results.
  ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest& request,
      const TQueryOptions& query_options, RuntimeProfile* summary_profile, bool is_test);
​
 private:
  /// These references are valid for the lifetime of this query schedule because they
  /// are all owned by the enclosing QueryExecState.
  const UniqueIdPB& query_id_;
  const TQueryExecRequest& request_;
​
  /// The query options from the TClientRequest
  const TQueryOptions& query_options_;
​
  /// Contains the results of scheduling that will be sent back to the coordinator.
  /// Ownership is transferred to the coordinator after scheduling has completed.
  std::unique_ptr<QuerySchedulePB> query_schedule_pb_;
​
  /// TODO: move these into QueryState
  RuntimeProfile* summary_profile_;
​
  /// Maps from plan node id to its fragment idx. Filled in c'tor.
  std::vector<int32_t> plan_node_to_fragment_idx_;
​
  /// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
  std::vector<int32_t> plan_node_to_plan_node_idx_;
​
  /// Populated in Init(), then calculated in Scheduler::ComputeFragmentExecParams().
  /// Indexed by fragment idx (TPlanFragment.idx).
  std::vector<FragmentScheduleState> fragment_schedule_states_;  //For ScheduleState Init  
​
  /// Map from backend address to corresponding BackendScheduleState. Created in
  /// GetOrCreateBackendScheduleState().
  PerBackendScheduleStates per_backend_schedule_states_;  //For ComputeFragmentExecParams
  
  
​
  /// Used to generate consecutive fragment instance ids.
  UniqueIdPB next_instance_id_;
​
  /// The largest min memory reservation across all executors. Set in
  /// Scheduler::Schedule().
  int64_t largest_min_reservation_ = 0;
​
  /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
  int64_t coord_min_reservation_ = 0;
​
  /// The name of the executor group that this schedule was computed for. Set by the
  /// Scheduler and only valid after scheduling completes successfully.
  std::string executor_group_;
​
  /// Random number generated used for any randomized decisions during scheduling.
  std::mt19937 rng_;
​
  /// Map from fragment idx to references into the 'request_'.
  std::unordered_map<int32_t, const TPlanFragment&> fragments_;
​
  /// Populate fragments_ and fragment_schedule_states_ from request_.plan_exec_info.
  /// Sets is_coord_fragment and exchange_input_fragments.
  /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
  void Init();
​
  /// Returns true if a coordinator fragment is required based on the query stmt type.
  bool RequiresCoordinatorFragment() const {
    return request_.stmt_type == TStmtType::QUERY;
  }
};
​
​
​
//1 add backend_exec
//1 Create Backend Scheudle Statue
BackendScheduleState& ScheduleState::GetOrCreateBackendScheduleState(
    const NetworkAddressPB& address) {
  auto it = per_backend_schedule_states_.find(address);
  if (it == per_backend_schedule_states_.end()) {
​
    
    //query_schedule_pb_ add BackendExecParamsPB
    //per_backend_schedule_states_ BackendSchedulerStates 
    //** query_schedule_pb_ also add backend_exec_param  for Init Backend States
    BackendExecParamsPB* be_params = query_schedule_pb_->add_backend_exec_params();
    it = per_backend_schedule_states_.emplace(address, BackendScheduleState(be_params))
             .first;
  }
  return it->second;
}
​
​
void Scheduler::ComputeBackendExecParams(
    const ExecutorConfig& executor_config, ScheduleState* state) {
  for (FragmentScheduleState& f : state->fragment_schedule_states()) {
    const NetworkAddressPB* prev_host = nullptr;
    int num_hosts = 0;
    for (FInstanceScheduleState& i : f.instance_states) {
      //Create Scheduler State For Backend
      BackendScheduleState& be_state = state->GetOrCreateBackendScheduleState(i.host);

0 人点赞