一 基础知识
数据库中一个逻辑查询计划生成后, 需要进行ToPhysical Plan 转化为物理的查询计划, 本文主要讲解 Scan算子 是怎么初始化和分发的, 其他算子可类推
Impala 中, 逻辑的查询计划是在Impala Fe中生成, 并携带在Plan Request 中, 交由BE Admission Control 来处理生成物理的查询计划, 分为以下几个步骤
1 FE 生成计划过程中, 首先因为Hash Join 等算子, 产生的Join Probe 和Join Builder 算子, 即一个Plan 可能生成多个Plan Fragment
2 BE 过程中, 因为是MPP 数据, 需要将一个Plan Fragment 按照 Executor(本文中为Backend)来做合理的拆分, 该步骤步骤主要在Impala Scheduler::Schedule CompactScanRangeAssignment 完成, 进行ScanRange和Backend 的绑定工作
3 BE 过程中, 因为是多线程并发的模型, 需要将分发给同一个Backend的 单个算子进行并行化处理, 即FragmentInstance 进行初始化 改步骤主要在 Scheduler::Schedule ComputeFragmentExecParams 来初始化
4 将初始化完成的Instance 二次完成 BackenScheudeState
5 在每个Executor 内部都有一个ControlService KRPC( 来接收Cooridinator 分发的任务ExecQueryFInstancesRequestPB)
....
本文先将这些, 可参考之前文档, 可关注后续文档
二 Common
代码语言:javascript复制syntax="proto2";
package impala;
message UniqueIdPB { //Query Id
required fixed64 hi = 1;
required fixed64 lo = 2;
}
QueryId
FragmentInstanceId
BackendId
SessionId
RegistrationId
message ColumnValuePB {
optional bool bool_val = 1;
optional int32 byte_val = 6;
optional int32 short_val = 7;
optional int32 int_val = 2;
optional int64 long_val = 3;
optional double double_val = 4;
optional string string_val = 5;
optional string binary_val = 8;
optional string timestamp_val = 9;
optional bytes decimal_val = 10;
optional int32 date_val = 11;
}
message NetworkAddressPB {
required string ip/host = 1;
required int32 port = 2;
}
PlanNodeId int32
InstanceId int32
FragmentIdx int32
三 Level0 ScanRange
BE
代码语言:javascript复制// Specification of an individual data range which is held in its entirety by a storage
// server. Corresponds to TScanRange and should be kept in sync with it.
message ScanRangePB {
// One of these must be set for every ScanRangePB.
optional HdfsFileSplitPB hdfs_file_split = 1;
optional HBaseKeyRangePB hbase_key_range = 2;
optional KuduTabletDescPB kudu_tablet_desc = 3;
optional ExternalTableSplitPB external_table_split_desc = 4;
optional bytes file_metadata = 5;
}
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
optional ScanRangePB scan_range = 1;
optional int32 volume_id = 2 [default = -1];
optional bool try_hdfs_cache = 3 [default = false];
optional bool is_remote = 4;
}
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {
repeated ScanRangeParamsPB scan_ranges = 1;
}
FE
代码语言:javascript复制// Specification of a subsection of a single HDFS file. Corresponds to HdfsFileSpiltPB and
// should be kept in sync with it.
struct THdfsFileSplit {
// File name (not the full path). The path is assumed to be relative to the
// 'location' of the THdfsPartition referenced by partition_id.
1: required string relative_path
// starting offset
2: required i64 offset
// length of split
3: required i64 length
// ID of partition within the THdfsTable associated with this scan node.
4: required i64 partition_id
// total size of the hdfs file
5: required i64 file_length
// compression type of the hdfs file
6: required CatalogObjects.THdfsCompression file_compression
// last modified time of the file
7: required i64 mtime
// Hash of the partition's path. This must be hashed with a hash algorithm that is
// consistent across different processes and machines. This is currently using
// Java's String.hashCode(), which is consistent. For testing purposes, this can use
// any consistent hash.
9: required i32 partition_path_hash
// The absolute path of the file, it's used only when data files are outside of
// the Iceberg table location (IMPALA-11507).
10: optional string absolute_path
// Whether the HDFS file is stored with erasure coding.
11: optional bool is_erasure_coded
}
struct TScanRange {
// one of these must be set for every TScanRange
1: optional THdfsFileSplit hdfs_file_split
//4: optional TExternalTableDesc external_table_desc
5: optional binary file_metadata
}
// location information for a single scan range
struct TScanRangeLocation {
// Index into TQueryExecRequest.host_list.
1: required i32 host_idx;
// disk volume identifier of a particular scan range at 'server';
// -1 indicates an unknown volume id;
// only set for TScanRange.hdfs_file_split
2: optional i32 volume_id = -1
// If true, this block is cached on this server.
3: optional bool is_cached = false
}
// A single scan range plus the hosts that serve it
struct TScanRangeLocationList {
1: required PlanNodes.TScanRange scan_range
// non-empty list
2: list<TScanRangeLocation> locations
}
// A specification for scan ranges. Scan ranges can be
// concrete or specs, which are used to generate concrete ranges.
// Each type is stored in a separate list.
struct TScanRangeSpec {
1: optional list<TScanRangeLocationList> concrete_ranges
2: optional list<PlanNodes.TFileSplitGeneratorSpec> split_specs
}
FE->BE
代码语言:javascript复制void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_range_pb) {
if (tscan_range.__isset.hdfs_file_split) {
//scan_range_pb->mutable_hdfs_file_split()
HdfsFileSplitPB* hdfs_file_split = scan_range_pb->mutable_hdfs_file_split();
hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
hdfs_file_split->set_file_compression(
THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
hdfs_file_split->set_partition_path_hash(
tscan_range.hdfs_file_split.partition_path_hash);
hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
if (tscan_range.hdfs_file_split.__isset.absolute_path) {
hdfs_file_split->set_absolute_path(
tscan_range.hdfs_file_split.absolute_path);
}
if (tscan_range.hdfs_file_split.__isset.delete_delta_files) {
for (auto & dd_file : tscan_range.hdfs_file_split.delete_delta_files) {
auto dd_file_pb = hdfs_file_split->mutable_delete_delta_files()->Add();
dd_file_pb->set_relative_path(dd_file.relative_path);
dd_file_pb->set_file_length(dd_file.file_length);
dd_file_pb->set_mtime(dd_file.mtime);
}
}
}
if (tscan_range.__isset.external_table_desc) {
ExternalTableSplitPB* external_table_split_pb =
scan_range_pb->mutable_external_table_split_desc();
external_table_split_pb->set_sql(tscan_range.external_table_desc.sql);
external_table_split_pb->set_host(tscan_range.external_table_desc.host);
external_table_split_pb->set_port(tscan_range.external_table_desc.port);
}
if (tscan_range.__isset.file_metadata) {
scan_range_pb->set_file_metadata(tscan_range.file_metadata);
}
}
//By Scheduler::AssignmentCtx::RecordScanRangeAssignment(
四 Level1 FragmentInstanceScheduleState
也是AdminssionControl 传输的单位 FragmentInstanceRequest
代码语言:javascript复制// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
// The join node id that will consume this join build.
optional int32 join_node_id = 1;
// Fragment instance id of the input fragment instance.
optional UniqueIdPB input_finstance_id = 2;
}
message FragmentInstanceExecParamsPB {
// The fragment instance id.
optional UniqueIdPB instance_id = 1;
//Fragment Instance ID
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
optional int32 fragment_idx = 2;
// Map from plan node id to a list of scan ranges.
map<int32, ScanRangesPB> per_node_scan_ranges = 5;
// 0-based ordinal number of this particular instance. This is within its fragment, not
// query-wide, so eg. there will be one instance '0' for each fragment.
optional int32 per_fragment_instance_idx = 6;
// In its role as a data sender, a fragment instance is assigned a "sender id" to
// uniquely identify it to a receiver. -1 = invalid.
optional int32 sender_id = 7 [default = -1];
// List of input join build finstances for joins in this finstance.//FragmentInstanceExecParamsPB 最多一个JoinBuild
repeated JoinBuildInputPB join_build_inputs = 8;
// If this is a join build fragment, the number of fragment instances that consume the
// join build. -1 = invalid.
optional int32 num_join_build_outputs = 9 [default = -1];
}
/// Execution parameters for a single fragment instance. Contains both intermediate
/// info needed by the scheduler and info that will be sent back to the coordinator.
///
/// FInstanceScheduleStates are created as children of FragmentScheduleStates (in
/// 'instance_states') and then the calculated execution parameters, 'exec_params', are
/// transferred to the corresponding BackendExecParamsPB in
/// Scheduler::ComputeBackendExecParams().
struct FragmentInstanceScheduleState {
NetworkAddressPB address;
/// Contains any info that needs to be sent back to the coordinator. Computed during
/// Scheduler::ComputeFragmentExecParams() then transferred to the corresponding
/// BackendExecParamsPB in Scheduler::ComputeBackendExecParams(), after which it
/// is no longer valid to access.
FragmentInstanceExecParamsPB exec_params;//1:1
//SomeMethod For manger FragmentInstanceExecParamasPB
FInstanceScheduleState(const UniqueIdPB& instance_id, const NetworkAddressPB& host,
const NetworkAddressPB& krpc_host, int per_fragment_instance_idx,
const FragmentScheduleState& fragment_state);
/// Adds the ranges in 'scan_ranges' to the scan at 'scan_idx' in 'exec_params'.
void AddScanRanges(int scan_idx, const std::vector<ScanRangeParamsPB>& scan_ranges); //addScanRangeS For InStance
//By ComputeFragmentExecParams
/* at FragmentScheduleState
if (!fragment_state->scan_range_assignment.empty()) {
DCHECK_EQ(fragment_state->scan_range_assignment.size(), 1);
auto first_entry = fragment_state->scan_range_assignment.begin();
for (const PerNodeScanRanges::value_type& entry : first_entry->second) {
instance_state.AddScanRanges(entry.first, entry.second);
}
}
*/
}
代码语言:javascript复制// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {
// The globally unique fragment instance id.
optional UniqueIdPB fragment_instance_id = 1;
// ip port of the KRPC backend service on the destination.
optional NetworkAddressPB address = 2;
}
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Output destinations, one per output partition. The partitioning of the output is
// specified by TPlanFragment.output_sink.output_partition in the corresponding
// TPlanFragment. The number of output partitions is destinations.size().
repeated PlanFragmentDestinationPB destinations = 2;
}
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Map from plan node id to initial scan ranges for each scan node in
// TPlanFragment.plan_tree
map<int32, ScanRangesPB> per_node_scan_ranges = 2;
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 3;
}
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
// This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
// the coordinator.
optional int32 coord_state_idx = 1;
// Sidecar index of the TQueryCtx.
optional int32 query_ctx_sidecar_idx = 2;
// Sidecar index of the TExecPlanFragmentInfo.
optional int32 plan_fragment_info_sidecar_idx = 3;
// The minimum query-wide memory reservation (in bytes) required for the backend
// executing the instances in fragment_instance_ctxs. This is the peak minimum
// reservation that may be required by the concurrently-executing operators at any
// point in query execution. It may be less than the initial reservation total claims
// (below) if execution of some operators never overlaps, which allows reuse of
// reservations.
optional int64 min_mem_reservation_bytes = 4;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 5;
// The backend memory limit (in bytes) as set by the admission controller. Used by the
// query mem tracker to enforce the memory limit.
optional int64 per_backend_mem_limit = 6;
// General execution parameters for different fragments. Corresponds to 'fragments' in
// the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentCtxPB fragment_ctxs = 7;
// Execution parameters for specific fragment instances. Corresponds to
// 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
}
message ExecQueryFInstancesResponsePB {
// Success or failure of the operation.
optional StatusPB status = 1;
}
五 Level2-1 FragmentScheduleState
1:1 FragmentScheduleState/TPlanFragment
1:n(FragmentInstnace)
代码语言:javascript复制/// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<ScanRangeParamsPB>> PerNodeScanRanges;
/// map from an impalad host address to the per-node assigned scan ranges;
/// records scan range assignment for a single fragment
typedef std::unordered_map<NetworkAddressPB, PerNodeScanRanges>
FragmentScanRangeAssignment;
/// Execution parameters shared between fragment instances. This struct is a container for
/// any intermediate data needed for scheduling that will not be sent back to the
/// coordinator as part of the QuerySchedulePB along with a pointer to the corresponding
/// FragmentExecParamsPB in the QuerySchedulePB.
struct FragmentScheduleState {
/// Only needed as intermediate state during exec parameter computation.
/// For scheduling, refer to FInstanceExecParamsPB.per_node_scan_ranges
FragmentScanRangeAssignment scan_range_assignment; //Only For ScanNode Init //FragmentInstanceScheduleState AddScanRanges By scan_range_assignment
bool is_coord_fragment;
const TPlanFragment& fragment;
/// Fragments that are inputs to an ExchangeNode of this fragment.
std::vector<FragmentIdx> exchange_input_fragments;
/// Instances of this fragment. Instances on a backend are clustered together - i.e. all
/// instances for a given backend will be consecutive entries in the vector. These have
/// their protobuf params Swap()-ed to the BackendExecParamsPB during
/// Scheduler::ComputeBackendExecParams() and are no longer valid after that.
std::vector<FInstanceScheduleState> instance_states;
/// Pointer to the corresponding FragmentExecParamsPB in the parent ScheduleState's
/// 'query_schedule_pb_'
FragmentExecParamsPB* exec_params;//Init
FragmentScheduleState(const TPlanFragment& fragment, FragmentExecParamsPB* exec_params);
//Init By void ScheduleState::Init() {
/*
const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
if (RequiresCoordinatorFragment()) {
fragment_schedule_states_[root_fragment.idx].is_coord_fragment = true;
// the coordinator instance gets index 0, generated instance ids start at 1
next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
}
*/
};
六 Level2-2 BackendScheduleState
代码语言:javascript复制// Execution parameters for a single backend. Used to construct the
// Coordinator::BackendStates.
message BackendExecParamsPB {
// The id of this backend.
optional UniqueIdPB backend_id = 1;
// The hostname port of the KRPC backend service on this backend.
optional NetworkAddressPB address = 8;
// The IP address port of the KRPC backend service on this backend.
optional NetworkAddressPB krpc_address = 9;
// The fragment instance params assigned to this backend. All instances of a
// particular fragment are contiguous in this list. This can be empty only for the
// coordinator backend, that is, if 'is_coord_backend' is true.
repeated FInstanceExecParamsPB instance_params = 2;
// The minimum query-wide buffer reservation size (in bytes) required for this backend.
// This is the peak minimum reservation that may be required by the
// concurrently-executing operators at any point in query execution. It may be less
// than the initial reservation total claims (below) if execution of some operators
// never overlaps, which allows reuse of reservations.
optional int64 min_mem_reservation_bytes = 3;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in instance_params. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 4;
// Total thread reservation for fragment instances scheduled on this backend. This is
// the peak number of required threads that may be required by the
// concurrently-executing fragment instances at any point in query execution.
optional int64 thread_reservation = 5;
// Number of slots that this query should count for in admission control.
// This is calculated as the maximum # of instances of any fragment on this backend.
// I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
// (but less if the query is not actually running with mt_dop instances on this node).
optional int32 slots_to_use = 6;
// Indicates whether this backend is the coordinator.
optional bool is_coord_backend = 7;
}
/// Execution parameters for a single backend. This gets created for every backend that
/// participates in query execution, which includes every backend that has fragments
/// scheduled on it and the coordinator backend.
///
/// Created by ScheduleState::GetOrCreateBackendScheduleState() and initialized in
/// Scheduler::ComputeBackendExecParams(). Used as an input to the
/// AdmissionController and Coordinator::BackendState.
struct BackendScheduleState {
BackendDescriptorPB be_desc;
/// Pointer to the corresponding protobuf struct containing any parameters for this
/// backend that will need to be sent back to the coordinator. Owned by
/// ScheduleState::query_schedule_pb_.
BackendExecParamsPB* exec_params; //BackendExecParamsPB
explicit BackendScheduleState(BackendExecParamsPB* exec_params)
: exec_params(exec_params) {}
};
七 Level3 ScheduleState
1:1 Query/Scheduler/Coordinator
1:n Fragment /Backend
Scheduler Setup3 Scheduler::ComputeBackendExecParams
代码语言:javascript复制
// Contains the output from scheduling and admission control that is used by the
// coordinator to start query execution.
message QuerySchedulePB {
optional UniqueIdPB query_id = 1;
// The per-fragment execution parameters for this schedule.
repeated FragmentExecParamsPB fragment_exec_params = 2;
// The per-backend execution parameters for this schedule.
repeated BackendExecParamsPB backend_exec_params = 3; //Backend/Executor
// Total number of scan ranges of this query.
optional int64 num_scan_ranges = 4;
// The memory limit per executor that will be imposed on the query.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 per_backend_mem_limit = 5;
// The per executor memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. Can be zero if the query is only scheduled to run on the coordinator.
optional int64 per_backend_mem_to_admit = 6;
// The memory limit for the coordinator that will be imposed on the query. Used only if
// the query has a coordinator fragment.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 coord_backend_mem_limit = 7;
// The coordinator memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully.
optional int64 coord_backend_mem_to_admit = 8;
}
/*
void ScheduleState::Init() {
*query_schedule_pb_->mutable_query_id() = query_id_;
// extract TPlanFragments and order by fragment idx
for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
fragments_.emplace(fragment.idx, fragment);
}
}
// this must only be called once
DCHECK_EQ(fragment_schedule_states_.size(), 0);
for (int i = 0; i < fragments_.size(); i) {
auto it = fragments_.find(i);
DCHECK(it != fragments_.end());
fragment_schedule_states_.emplace_back(
it->second, query_schedule_pb_->add_fragment_exec_params());
//1 add_fragment_exec_params
//2 add fragment_schedule_states_
}*/
/// Map from an impalad backend address to the state for that backend.
typedef std::unordered_map<NetworkAddressPB, BackendScheduleState>
PerBackendScheduleStates;
/// ScheduleState is a container class for scheduling data used by Scheduler and
/// AdmissionController, which perform the scheduling logic itself, and it is only
/// intended to be accessed by them. The information needed for the coordinator to begin
/// execution is stored in 'query_schedule_pb_', which is returned from the
/// AdmissionController on successful admission. Everything else is intermediate data
/// needed to calculate the schedule but is discarded after a scheduling decision is made.
///
/// The general usage pattern is:
/// - FragmentScheduleStates are created for each fragment in the plan. They are given
/// pointers to corresponding FragmentExecParamsPBs created in the QuerySchedulePB.
/// - FInstanceScheduleStates are created as children of the FragmentScheduleStates for
/// each finstance and assigned to hosts. The FInstanceScheduleStates each have a
/// corresponding FInstanceExecParamsPB that they initially own.
/// - The scheduler computes the BackendScheduleState for each backend that was assigned a
/// fragment instance (and the coordinator backend). They are given pointers to
/// corresponding BackendExecParamsPBs created in the QuerySchedulePB and the
/// FInstanceExecParamsPB are Swap()-ed into them.
/// - The ScheduleState is passed to the admission controller, which keeps updating the
/// memory requirements by calling UpdateMemoryRequirements() every time it tries to
/// admit the query and sets the final values once the query gets admitted successfully.
/// - On successful admission, the QuerySchedulePB is returned to the coordinator and
/// everything else is discarded.
class ScheduleState {
public:
/// For testing only: specify 'is_test=true' to build a ScheduleState object without
/// running Init() and to seed the random number generator for deterministic results.
ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest& request,
const TQueryOptions& query_options, RuntimeProfile* summary_profile, bool is_test);
private:
/// These references are valid for the lifetime of this query schedule because they
/// are all owned by the enclosing QueryExecState.
const UniqueIdPB& query_id_;
const TQueryExecRequest& request_;
/// The query options from the TClientRequest
const TQueryOptions& query_options_;
/// Contains the results of scheduling that will be sent back to the coordinator.
/// Ownership is transferred to the coordinator after scheduling has completed.
std::unique_ptr<QuerySchedulePB> query_schedule_pb_;
/// TODO: move these into QueryState
RuntimeProfile* summary_profile_;
/// Maps from plan node id to its fragment idx. Filled in c'tor.
std::vector<int32_t> plan_node_to_fragment_idx_;
/// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
std::vector<int32_t> plan_node_to_plan_node_idx_;
/// Populated in Init(), then calculated in Scheduler::ComputeFragmentExecParams().
/// Indexed by fragment idx (TPlanFragment.idx).
std::vector<FragmentScheduleState> fragment_schedule_states_; //For ScheduleState Init
/// Map from backend address to corresponding BackendScheduleState. Created in
/// GetOrCreateBackendScheduleState().
PerBackendScheduleStates per_backend_schedule_states_; //For ComputeFragmentExecParams
/// Used to generate consecutive fragment instance ids.
UniqueIdPB next_instance_id_;
/// The largest min memory reservation across all executors. Set in
/// Scheduler::Schedule().
int64_t largest_min_reservation_ = 0;
/// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
int64_t coord_min_reservation_ = 0;
/// The name of the executor group that this schedule was computed for. Set by the
/// Scheduler and only valid after scheduling completes successfully.
std::string executor_group_;
/// Random number generated used for any randomized decisions during scheduling.
std::mt19937 rng_;
/// Map from fragment idx to references into the 'request_'.
std::unordered_map<int32_t, const TPlanFragment&> fragments_;
/// Populate fragments_ and fragment_schedule_states_ from request_.plan_exec_info.
/// Sets is_coord_fragment and exchange_input_fragments.
/// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
void Init();
/// Returns true if a coordinator fragment is required based on the query stmt type.
bool RequiresCoordinatorFragment() const {
return request_.stmt_type == TStmtType::QUERY;
}
};
//1 add backend_exec
//1 Create Backend Scheudle Statue
BackendScheduleState& ScheduleState::GetOrCreateBackendScheduleState(
const NetworkAddressPB& address) {
auto it = per_backend_schedule_states_.find(address);
if (it == per_backend_schedule_states_.end()) {
//query_schedule_pb_ add BackendExecParamsPB
//per_backend_schedule_states_ BackendSchedulerStates
//** query_schedule_pb_ also add backend_exec_param for Init Backend States
BackendExecParamsPB* be_params = query_schedule_pb_->add_backend_exec_params();
it = per_backend_schedule_states_.emplace(address, BackendScheduleState(be_params))
.first;
}
return it->second;
}
void Scheduler::ComputeBackendExecParams(
const ExecutorConfig& executor_config, ScheduleState* state) {
for (FragmentScheduleState& f : state->fragment_schedule_states()) {
const NetworkAddressPB* prev_host = nullptr;
int num_hosts = 0;
for (FInstanceScheduleState& i : f.instance_states) {
//Create Scheduler State For Backend
BackendScheduleState& be_state = state->GetOrCreateBackendScheduleState(i.host);