ClickHouse opt 2 QueryPlan::buildQueryPipeline

2023-11-05 14:52:35 浏览数 (3)

一 QueryPlan::buildQueryPipeline

pipe 执行逻辑如下

buildQueryPipelinebuildQueryPipeline

二 基础知识

QueryPlanStep

代码语言:javascript复制
  ^IQueryPlanStep$
  └── IQueryPlanStep    [vim src/Processors/QueryPlan/IQueryPlanStep.h  70]
      ├── CreatingSetsStep      [vim src/Processors/QueryPlan/CreatingSetsStep.h  35]
      ├── IntersectOrExceptStep [vim src/Processors/QueryPlan/IntersectOrExceptStep.h  9]
      ├── JoinStep      [vim src/Processors/QueryPlan/JoinStep.h  12]
      ├── UnionStep     [vim src/Processors/QueryPlan/UnionStep.h  8]
      ├── ITransformingStep     [vim src/Processors/QueryPlan/ITransformingStep.h  9]
      │   ├── AggregatingStep   [vim src/Processors/QueryPlan/AggregatingStep.h  14]
      │   ├── ArrayJoinStep     [vim src/Processors/QueryPlan/ArrayJoinStep.h  10]
      │   ├── CreatingSetStep   [vim src/Processors/QueryPlan/CreatingSetsStep.h  12]
      │   ├── CubeStep  [vim src/Processors/QueryPlan/CubeStep.h  13]
      │   ├── DistinctStep      [vim src/Processors/QueryPlan/DistinctStep.h  9]
      │   ├── ExpressionStep    [vim src/Processors/QueryPlan/ExpressionStep.h  14]
      │   ├── ExtremesStep      [vim src/Processors/QueryPlan/ExtremesStep.h  7]
      │   ├── FilledJoinStep    [vim src/Processors/QueryPlan/JoinStep.h  37]
      │   ├── FillingStep       [vim src/Processors/QueryPlan/FillingStep.h  9]
      │   ├── FilterStep        [vim src/Processors/QueryPlan/FilterStep.h  11]
      │   ├── LimitByStep       [vim src/Processors/QueryPlan/LimitByStep.h  8]
      │   ├── LimitStep [vim src/Processors/QueryPlan/LimitStep.h  9]
      │   ├── MergingAggregatedStep     [vim src/Processors/QueryPlan/MergingAggregatedStep.h  12]
      │   ├── OffsetStep        [vim src/Processors/QueryPlan/OffsetStep.h  9]
      │   ├── RollupStep        [vim src/Processors/QueryPlan/RollupStep.h  12]
      │   ├── SettingQuotaAndLimitsStep [vim src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h  21]
      │   ├── SortingStep       [vim src/Processors/QueryPlan/SortingStep.h  11]
      │   ├── TotalsHavingStep  [vim src/Processors/QueryPlan/TotalsHavingStep.h  13]
      │   └── WindowStep        [vim src/Processors/QueryPlan/WindowStep.h  14]
      └── ISourceStep   [vim src/Processors/QueryPlan/ISourceStep.h  8]
          ├── ReadFromMergeTree [vim src/Processors/QueryPlan/ReadFromMergeTree.h  26]
          ├── ReadFromRemote    [vim src/Processors/QueryPlan/ReadFromRemote.h  21]
          ├── ReadNothingStep   [vim src/Processors/QueryPlan/ReadNothingStep.h  8]
          └── ReadFromPreparedSource    [vim src/Processors/QueryPlan/ReadFromPreparedSource.h  9]
              └── ReadFromStorageStep   [vim src/Processors/QueryPlan/ReadFromPreparedSource.h  23]

Processor

代码语言:javascript复制
  ^IProcessor$
  └── IProcessor        [vim src/Processors/IProcessor.h  110]
      ├── AggregatingInOrderTransform   [vim src/Processors/Transforms/AggregatingInOrderTransform.h  19]
      ├── AggregatingTransform  [vim src/Processors/Transforms/AggregatingTransform.h  102]
      ├── ConcatProcessor       [vim src/Processors/ConcatProcessor.h  16]
      ├── ConvertingAggregatedToChunksTransform [vim src/Processors/Transforms/AggregatingTransform.cpp  154]
      ├── CopyTransform [vim src/Processors/Transforms/CopyTransform.h  9]
      ├── CopyingDataToViewsTransform   [vim src/Processors/Transforms/buildPushingToViewsChain.cpp  71]
      ├── DelayedPortsProcessor [vim src/Processors/DelayedPortsProcessor.h  11]
      ├── DelayedSource [vim src/Processors/Sources/DelayedSource.h  17]
      ├── FillingRightJoinSideTransform [vim src/Processors/Transforms/JoiningTransform.h  87]
      ├── FinalizingViewsTransform      [vim src/Processors/Transforms/buildPushingToViewsChain.cpp  148]
      ├── ForkProcessor [vim src/Processors/ForkProcessor.h  18]
      ├── GroupingAggregatedTransform   [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h  59]
      ├── IInflatingTransform   [vim src/Processors/IInflatingTransform.h  21]
      ├── IntersectOrExceptTransform    [vim src/Processors/Transforms/IntersectOrExceptTransform.h  12]
      ├── JoiningTransform      [vim src/Processors/Transforms/JoiningTransform.h  18]
      ├── LimitTransform        [vim src/Processors/LimitTransform.h  18]
      ├── OffsetTransform       [vim src/Processors/OffsetTransform.h  13]
      ├── ResizeProcessor       [vim src/Processors/ResizeProcessor.h  21]
      ├── SortingAggregatedTransform    [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h  121]
      ├── StrictResizeProcessor [vim src/Processors/ResizeProcessor.h  77]
      ├── WindowTransform       [vim src/Processors/Transforms/WindowTransform.h  87]
      ├── IAccumulatingTransform        [vim src/Processors/IAccumulatingTransform.h  13]
      │   ├── BufferingToFileTransform  [vim src/Processors/Transforms/MergeSortingTransform.cpp  30]
      │   ├── CreatingSetsTransform     [vim src/Processors/Transforms/CreatingSetsTransform.h  26]
      │   ├── CubeTransform     [vim src/Processors/Transforms/CubeTransform.h  11]
      │   ├── MergingAggregatedTransform        [vim src/Processors/Transforms/MergingAggregatedTransform.h  12]
      │   ├── QueueBuffer       [vim src/Processors/QueueBuffer.h  13]
      │   ├── RollupTransform   [vim src/Processors/Transforms/RollupTransform.h  10]
      │   ├── TTLCalcTransform  [vim src/Processors/Transforms/TTLCalcTransform.h  14]
      │   └── TTLTransform      [vim src/Processors/Transforms/TTLTransform.h  15]
      ├── ISimpleTransform      [vim src/Processors/ISimpleTransform.h  17]
      │   ├── AddingDefaultsTransform   [vim src/Processors/Transforms/AddingDefaultsTransform.h  13]
      │   ├── AddingSelectorTransform   [vim src/Processors/Transforms/AddingSelectorTransform.h  12]
      │   ├── ArrayJoinTransform        [vim src/Processors/Transforms/ArrayJoinTransform.h  11]
      │   ├── CheckSortedTransform      [vim src/Processors/Transforms/CheckSortedTransform.h  12]
      │   ├── DistinctSortedTransform   [vim src/Processors/Transforms/DistinctSortedTransform.h  21]
      │   ├── DistinctTransform [vim src/Processors/Transforms/DistinctTransform.h  10]
      │   ├── ExpressionTransform       [vim src/Processors/Transforms/ExpressionTransform.h  18]
      │   ├── ExtremesTransform [vim src/Processors/Transforms/ExtremesTransform.h  7]
      │   ├── FillingTransform  [vim src/Processors/Transforms/FillingTransform.h  13]
      │   ├── FilterTransform   [vim src/Processors/Transforms/FilterTransform.h  18]
      │   ├── FinalizeAggregatedTransform       [vim src/Processors/Transforms/AggregatingInOrderTransform.h  78]
      │   ├── LimitByTransform  [vim src/Processors/Transforms/LimitByTransform.h  10]
      │   ├── LimitsCheckingTransform   [vim src/Processors/Transforms/LimitsCheckingTransform.h  26]
      │   ├── MaterializingTransform    [vim src/Processors/Transforms/MaterializingTransform.h  8]
      │   ├── MergingAggregatedBucketTransform  [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h  105]
      │   ├── PartialSortingTransform   [vim src/Processors/Transforms/PartialSortingTransform.h  13]
      │   ├── ReplacingWindowColumnTransform    [vim src/Processors/Transforms/ReplacingWindowColumnTransform.h  13]
      │   ├── ReverseTransform  [vim src/Processors/Transforms/ReverseTransform.h  8]
      │   ├── SendingChunkHeaderTransform       [vim src/Processors/Sources/ShellCommandSource.cpp  429]
      │   ├── TotalsHavingTransform     [vim src/Processors/Transforms/TotalsHavingTransform.h  23]
      │   ├── TransformWithAdditionalColumns    [vim src/Dictionaries/DictionarySourceHelpers.h  40]
      │   └── WatermarkTransform        [vim src/Processors/Transforms/WatermarkTransform.h  10]
      ├── ISink [vim src/Processors/ISink.h  9]
      │   ├── EmptySink [vim src/Processors/Sinks/EmptySink.h  8]
      │   ├── ExternalTableDataSink     [vim src/Client/Connection.cpp  682]
      │   ├── NullSink  [vim src/Processors/Sinks/NullSink.h  8]
      │   └── ODBCSink  [vim programs/odbc-bridge/ODBCBlockOutputStream.h  14]
      ├── SortingTransform      [vim src/Processors/Transforms/SortingTransform.h  63]
      │   ├── FinishSortingTransform    [vim src/Processors/Transforms/FinishSortingTransform.h  10]
      │   └── MergeSortingTransform     [vim src/Processors/Transforms/MergeSortingTransform.h  17]
      ├── IMergingTransformBase [vim src/Processors/Merges/IMergingTransform.h  12]
      │   └── IMergingTransform [vim src/Processors/Merges/IMergingTransform.h  77]
      │       ├── AggregatingSortedTransform    [vim src/Processors/Merges/AggregatingSortedTransform.h  12]
      │       ├── CollapsingSortedTransform     [vim src/Processors/Merges/CollapsingSortedTransform.h  10]
      │       ├── ColumnGathererTransform       [vim src/Processors/Transforms/ColumnGathererTransform.h  105]
      │       ├── FinishAggregatingInOrderTransform     [vim src/Processors/Merges/FinishAggregatingInOrderTransform.h  12]
      │       ├── GraphiteRollupSortedTransform [vim src/Processors/Merges/GraphiteRollupSortedTransform.h  10]
      │       ├── MergingSortedTransform        [vim src/Processors/Merges/MergingSortedTransform.h  11]
      │       ├── ReplacingSortedTransform      [vim src/Processors/Merges/ReplacingSortedTransform.h  11]
      │       ├── SummingSortedTransform        [vim src/Processors/Merges/SummingSortedTransform.h  10]
      │       └── VersionedCollapsingTransform  [vim src/Processors/Merges/VersionedCollapsingTransform.h  11]
      ├── ExceptionKeepingTransform     [vim src/Processors/Transforms/ExceptionKeepingTransform.h  24]
      │   ├── CheckConstraintsTransform [vim src/Processors/Transforms/CheckConstraintsTransform.h  15]
      │   ├── ConvertingTransform       [vim src/Processors/Transforms/ExpressionTransform.h  36]
      │   ├── CountingTransform [vim src/Processors/Transforms/CountingTransform.h  14]
      │   ├── ExecutingInnerQueryFromViewTransform      [vim src/Processors/Transforms/buildPushingToViewsChain.cpp  86]
      │   ├── SquashingChunksTransform  [vim src/Processors/Transforms/SquashingChunksTransform.h  9]
      │   └── SinkToStorage     [vim src/Processors/Sinks/SinkToStorage.h  9]
      │       ├── BufferSink    [vim src/Storages/StorageBuffer.cpp  535]
      │       ├── DistributedSink       [vim src/Storages/Distributed/DistributedSink.h  39]
      │       ├── EmbeddedRocksDBSink   [vim src/Storages/RocksDB/EmbeddedRocksDBSink.h  13]
      │       ├── HDFSSink      [vim src/Storages/HDFS/StorageHDFS.cpp  438]
      │       ├── KafkaSink     [vim src/Storages/Kafka/KafkaBlockOutputStream.h  12]
      │       ├── LiveViewSink  [vim src/Storages/LiveView/LiveViewSink.h  12]
      │       ├── LogSink       [vim src/Storages/StorageLog.cpp  262]
      │       ├── MemorySink    [vim src/Storages/StorageMemory.cpp  108]
      │       ├── MergeTreeSink [vim src/Storages/MergeTree/MergeTreeSink.h  14]
      │       ├── NullSinkToStorage     [vim src/Processors/Sinks/SinkToStorage.h  36]
      │       ├── PostgreSQLSink        [vim src/Storages/StoragePostgreSQL.cpp  109]
      │       ├── PushingToLiveViewSink [vim src/Processors/Transforms/buildPushingToViewsChain.cpp  117]
      │       ├── PushingToWindowViewSink       [vim src/Processors/Transforms/buildPushingToViewsChain.cpp  131]
      │       ├── RabbitMQSink  [vim src/Storages/RabbitMQ/RabbitMQSink.h  13]
      │       ├── RemoteSink    [vim src/Processors/Sinks/RemoteSink.h  8]
      │       ├── ReplicatedMergeTreeSink       [vim src/Storages/MergeTree/ReplicatedMergeTreeSink.h  22]
      │       ├── SQLiteSink    [vim src/Storages/StorageSQLite.cpp  87]
      │       ├── SetOrJoinSink [vim src/Storages/StorageSet.cpp  37]
      │       ├── StorageFileSink       [vim src/Storages/StorageFile.cpp  676]
      │       ├── StorageMySQLSink      [vim src/Storages/StorageMySQL.cpp  114]
      │       ├── StorageS3Sink [vim src/Storages/StorageS3.cpp  377]
      │       ├── StorageURLSink        [vim src/Storages/StorageURL.h  103]
      │       ├── StripeLogSink [vim src/Storages/StorageStripeLog.cpp  166]
      │       └── PartitionedSink       [vim src/Storages/PartitionedSink.h  14]
      │           ├── PartitionedHDFSSink       [vim src/Storages/HDFS/StorageHDFS.cpp  480]
      │           ├── PartitionedStorageFileSink        [vim src/Storages/StorageFile.cpp  799]
      │           ├── PartitionedStorageS3Sink  [vim src/Storages/StorageS3.cpp  436]
      │           └── PartitionedStorageURLSink [vim src/Storages/StorageURL.cpp  345]
      ├── IOutputFormat [vim src/Processors/Formats/IOutputFormat.h  23]
      │   ├── ArrowBlockOutputFormat    [vim src/Processors/Formats/Impl/ArrowBlockOutputFormat.h  18]
      │   ├── LazyOutputFormat  [vim src/Processors/Formats/LazyOutputFormat.h  13]
      │   ├── MySQLOutputFormat [vim src/Processors/Formats/Impl/MySQLOutputFormat.h  20]
      │   ├── NativeOutputFormat        [vim src/Processors/Formats/Impl/NativeFormat.cpp  55]
      │   ├── NullOutputFormat  [vim src/Processors/Formats/Impl/NullFormat.h  7]
      │   ├── ODBCDriver2BlockOutputFormat      [vim src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h  20]
      │   ├── ORCBlockOutputFormat      [vim src/Processors/Formats/Impl/ORCBlockOutputFormat.h  34]
      │   ├── ParallelFormattingOutputFormat    [vim src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h  54]
      │   ├── ParquetBlockOutputFormat  [vim src/Processors/Formats/Impl/ParquetBlockOutputFormat.h  27]
      │   ├── PostgreSQLOutputFormat    [vim src/Processors/Formats/Impl/PostgreSQLOutputFormat.h  13]
      │   ├── PullingOutputFormat       [vim src/Processors/Formats/PullingOutputFormat.h  9]
      │   ├── TemplateBlockOutputFormat [vim src/Processors/Formats/Impl/TemplateBlockOutputFormat.h  13]
      │   ├── PrettyBlockOutputFormat   [vim src/Processors/Formats/Impl/PrettyBlockOutputFormat.h  17]
      │   │   ├── PrettyCompactBlockOutputFormat        [vim src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h  13]
      │   │   └── PrettySpaceBlockOutputFormat  [vim src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h  11]
      │   └── IRowOutputFormat  [vim src/Processors/Formats/IRowOutputFormat.h  24]
      │       ├── AvroRowOutputFormat   [vim src/Processors/Formats/Impl/AvroRowOutputFormat.h  46]
      │       ├── BinaryRowOutputFormat [vim src/Processors/Formats/Impl/BinaryRowOutputFormat.h  17]
      │       ├── CSVRowOutputFormat    [vim src/Processors/Formats/Impl/CSVRowOutputFormat.h  17]
      │       ├── CapnProtoRowOutputFormat      [vim src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h  26]
      │       ├── CustomSeparatedRowOutputFormat        [vim src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.h  11]
      │       ├── JSONCompactEachRowRowOutputFormat     [vim src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h  15]
      │       ├── MarkdownRowOutputFormat       [vim src/Processors/Formats/Impl/MarkdownRowOutputFormat.h  12]
      │       ├── MsgPackRowOutputFormat        [vim src/Processors/Formats/Impl/MsgPackRowOutputFormat.h  18]
      │       ├── ProtobufRowOutputFormat       [vim src/Processors/Formats/Impl/ProtobufRowOutputFormat.h  29]
      │       ├── RawBLOBRowOutputFormat        [vim src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h  27]
      │       ├── ValuesRowOutputFormat [vim src/Processors/Formats/Impl/ValuesRowOutputFormat.h  15]
      │       ├── VerticalRowOutputFormat       [vim src/Processors/Formats/Impl/VerticalRowOutputFormat.h  18]
      │       ├── XMLRowOutputFormat    [vim src/Processors/Formats/Impl/XMLRowOutputFormat.h  16]
      │       ├── JSONEachRowRowOutputFormat    [vim src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h  15]
      │       │   └── JSONEachRowWithProgressRowOutputFormat    [vim src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h  8]
      │       ├── JSONRowOutputFormat   [vim src/Processors/Formats/Impl/JSONRowOutputFormat.h  16]
      │       │   └── JSONCompactRowOutputFormat        [vim src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h  16]
      │       └── TabSeparatedRowOutputFormat   [vim src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h  15]
      │           └── TSKVRowOutputFormat       [vim src/Processors/Formats/Impl/TSKVRowOutputFormat.h  14]
      └── ISource       [vim src/Processors/ISource.h  9]
          ├── ConvertingAggregatedToChunksSource        [vim src/Processors/Transforms/AggregatingTransform.cpp  88]
          ├── MergeSorterSource [vim src/Processors/Transforms/SortingTransform.h  45]
          ├── NullSource        [vim src/Processors/Sources/NullSource.h  8]
          ├── ODBCSource        [vim programs/odbc-bridge/ODBCBlockInputStream.h  13]
          ├── PushingAsyncSource        [vim src/Processors/Executors/PushingAsyncPipelineExecutor.cpp  17]
          ├── PushingSource     [vim src/Processors/Executors/PushingPipelineExecutor.cpp  15]
          ├── RemoteExtremesSource      [vim src/Processors/Sources/RemoteSource.h  70]
          ├── RemoteTotalsSource        [vim src/Processors/Sources/RemoteSource.h  54]
          ├── SourceFromNativeStream    [vim src/Processors/Transforms/AggregatingTransform.cpp  53]
          ├── TemporaryFileLazySource   [vim src/Processors/Sources/TemporaryFileLazySource.h  10]
          ├── WaitForAsyncInsertSource  [vim src/Processors/Sources/WaitForAsyncInsertSource.h  11]
          ├── ISourceWithProgress       [vim src/Processors/Sources/SourceWithProgress.h  16]
          │   └── SourceWithProgress    [vim src/Processors/Sources/SourceWithProgress.h  48]
          │       ├── BlocksListSource  [vim src/Processors/Sources/BlocksListSource.h  12]
          │       ├── BlocksSource      [vim src/Processors/Sources/BlocksSource.h  23]
          │       ├── BufferSource      [vim src/Storages/StorageBuffer.cpp  146]
          │       ├── CassandraSource   [vim src/Dictionaries/CassandraSource.h  14]
          │       ├── ColumnsSource     [vim src/Storages/System/StorageSystemColumns.cpp  67]
          │       ├── DDLQueryStatusSource      [vim src/Interpreters/executeDDLQueryOnCluster.cpp  174]
          │       ├── DataSkippingIndicesSource [vim src/Storages/System/StorageSystemDataSkippingIndices.cpp  35]
          │       ├── DictionarySource  [vim src/Dictionaries/DictionarySource.cpp  14]
          │       ├── DirectoryMonitorSource    [vim src/Storages/Distributed/DirectoryMonitor.cpp  908]
          │       ├── EmbeddedRocksDBSource     [vim src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp  180]
          │       ├── FileLogSource     [vim src/Storages/FileLog/FileLogSource.h  13]
          │       ├── GenerateSource    [vim src/Storages/StorageGenerateRandom.cpp  379]
          │       ├── HDFSSource        [vim src/Storages/HDFS/StorageHDFS.h  85]
          │       ├── JoinSource        [vim src/Storages/StorageJoin.cpp  380]
          │       ├── KafkaSource       [vim src/Storages/Kafka/KafkaSource.h  16]
          │       ├── LiveViewEventsSource      [vim src/Storages/LiveView/LiveViewEventsSource.h  30]
          │       ├── LiveViewSource    [vim src/Storages/LiveView/LiveViewSource.h  14]
          │       ├── LogSource [vim src/Storages/StorageLog.cpp  57]
          │       ├── MemorySource      [vim src/Storages/StorageMemory.cpp  30]
          │       ├── MergeTreeSequentialSource [vim src/Storages/MergeTree/MergeTreeSequentialSource.h  12]
          │       ├── MongoDBSource     [vim src/Processors/Transforms/MongoDBSource.h  25]
          │       ├── NumbersMultiThreadedSource        [vim src/Storages/System/StorageSystemNumbers.cpp  64]
          │       ├── NumbersSource     [vim src/Storages/System/StorageSystemNumbers.cpp  17]
          │       ├── RabbitMQSource    [vim src/Storages/RabbitMQ/RabbitMQSource.h  11]
          │       ├── RedisSource       [vim src/Dictionaries/RedisSource.h  22]
          │       ├── RemoteSource      [vim src/Processors/Sources/RemoteSource.h  17]
          │       ├── SQLiteSource      [vim src/Processors/Sources/SQLiteSource.h  15]
          │       ├── ShellCommandSource        [vim src/Processors/Sources/ShellCommandSource.cpp  247]
          │       ├── SourceFromSingleChunk     [vim src/Processors/Sources/SourceFromSingleChunk.h  8]
          │       ├── StorageFileSource [vim src/Storages/StorageFile.cpp  413]
          │       ├── StorageHiveSource [vim src/Storages/Hive/StorageHive.cpp  56]
          │       ├── StorageInputSource        [vim src/Storages/StorageInput.cpp  28]
          │       ├── StorageS3Source   [vim src/Storages/StorageS3.h  33]
          │       ├── StorageURLSource  [vim src/Storages/StorageURL.cpp  115]
          │       ├── StripeLogSource   [vim src/Storages/StorageStripeLog.cpp  62]
          │       ├── SyncKillQuerySource       [vim src/Interpreters/InterpreterKillQueryQuery.cpp  127]
          │       ├── TablesBlockSource [vim src/Storages/System/StorageSystemTables.cpp  127]
          │       ├── WindowViewSource  [vim src/Storages/WindowView/WindowViewSource.h  10]
          │       ├── ZerosSource       [vim src/Storages/System/StorageSystemZeros.cpp  26]
          │       ├── MySQLSource       [vim src/Processors/Sources/MySQLSource.h  28]
          │       │   └── MySQLWithFailoverSource       [vim src/Processors/Sources/MySQLSource.h  64]
          │       ├── PostgreSQLSource  [vim src/Processors/Transforms/PostgreSQLSource.h  19]
          │       │   └── PostgreSQLTransactionSource   [vim src/Processors/Transforms/PostgreSQLSource.h  66]
          │       └── MergeTreeBaseSelectProcessor      [vim src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h  32]
          │           ├── MergeTreeThreadSelectProcessor        [vim src/Storages/MergeTree/MergeTreeThreadSelectProcessor.h  14]
          │           └── MergeTreeSelectProcessor      [vim src/Storages/MergeTree/MergeTreeSelectProcessor.h  16]
          │               ├── MergeTreeInOrderSelectProcessor   [vim src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h  11]
          │               └── MergeTreeReverseSelectProcessor   [vim src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h  12]
          └── IInputFormat      [vim src/Processors/Formats/IInputFormat.h  30]
              ├── ArrowBlockInputFormat [vim src/Processors/Formats/Impl/ArrowBlockInputFormat.h  19]
              ├── NativeInputFormat     [vim src/Processors/Formats/Impl/NativeFormat.cpp  15]
              ├── ORCBlockInputFormat   [vim src/Processors/Formats/Impl/ORCBlockInputFormat.h  21]
              ├── ParallelParsingInputFormat    [vim src/Processors/Formats/Impl/ParallelParsingInputFormat.h  70]
              ├── ParquetBlockInputFormat       [vim src/Processors/Formats/Impl/ParquetBlockInputFormat.h  18]
              ├── ValuesBlockInputFormat        [vim src/Processors/Formats/Impl/ValuesBlockInputFormat.h  20]
              └── IRowInputFormat       [vim src/Processors/Formats/IRowInputFormat.h  38]
                  ├── AvroConfluentRowInputFormat       [vim src/Processors/Formats/Impl/AvroRowInputFormat.h  140]
                  ├── AvroRowInputFormat        [vim src/Processors/Formats/Impl/AvroRowInputFormat.h  118]
                  ├── CapnProtoRowInputFormat   [vim src/Processors/Formats/Impl/CapnProtoRowInputFormat.h  23]
                  ├── JSONEachRowRowInputFormat [vim src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h  21]
                  ├── LineAsStringRowInputFormat        [vim src/Processors/Formats/Impl/LineAsStringRowInputFormat.h  17]
                  ├── MsgPackRowInputFormat     [vim src/Processors/Formats/Impl/MsgPackRowInputFormat.h  59]
                  ├── ProtobufRowInputFormat    [vim src/Processors/Formats/Impl/ProtobufRowInputFormat.h  29]
                  ├── RawBLOBRowInputFormat     [vim src/Processors/Formats/Impl/RawBLOBRowInputFormat.h  16]
                  ├── RegexpRowInputFormat      [vim src/Processors/Formats/Impl/RegexpRowInputFormat.h  51]
                  ├── TSKVRowInputFormat        [vim src/Processors/Formats/Impl/TSKVRowInputFormat.h  24]
                  ├── JSONAsRowInputFormat      [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h  15]
                  │   ├── JSONAsObjectRowInputFormat    [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h  56]
                  │   └── JSONAsStringRowInputFormat    [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h  43]
                  └── RowInputFormatWithDiagnosticInfo  [vim src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h  12]
                      ├── TemplateRowInputFormat        [vim src/Processors/Formats/Impl/TemplateRowInputFormat.h  18]
                      └── RowInputFormatWithNamesAndTypes       [vim src/Processors/Formats/RowInputFormatWithNamesAndTypes.h  24]
                          ├── BinaryRowInputFormat      [vim src/Processors/Formats/Impl/BinaryRowInputFormat.h  20]
                          ├── CustomSeparatedRowInputFormat     [vim src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.h  11]
                          ├── JSONCompactEachRowRowInputFormat  [vim src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h  22]
                          ├── TabSeparatedRowInputFormat        [vim src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h  14]
                          └── CSVRowInputFormat [vim src/Processors/Formats/Impl/CSVRowInputFormat.h  18]
                              └── HiveTextRowInputFormat        [vim src/Processors/Formats/Impl/HiveTextRowInputFormat.h  15]

三 精讲

queryPlan->PipelinequeryPlan->Pipeline

四 关系图

参考https://bbs.huaweicloud.com/blogs/314808

五 详解

https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/QueryPlan.cpp#L156

代码语言:javascript复制
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
    const QueryPlanOptimizationSettings & optimization_settings,
    const BuildQueryPipelineSettings & build_pipeline_settings)
{
    checkInitialized();
    optimize(optimization_settings);

    struct Frame
    {
        Node * node = {};
        QueryPipelineBuilders pipelines = {};
    };

    QueryPipelineBuilderPtr last_pipeline;

    std::stack<Frame> stack;
    stack.push(Frame{.node = root});

    while (!stack.empty())
    {
        auto & frame = stack.top();

        if (last_pipeline)
        {
            frame.pipelines.emplace_back(std::move(last_pipeline));
            last_pipeline = nullptr;
        }

        size_t next_child = frame.pipelines.size();
        if (next_child == frame.node->children.size())
        {
            bool limit_max_threads = frame.pipelines.empty();
            last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);

            if (limit_max_threads && max_threads)
                last_pipeline->limitMaxThreads(max_threads);

            stack.pop();
        }
        else
            stack.push(Frame{.node = frame.node->children[next_child]});
    }

    last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
    last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
    last_pipeline->addResources(std::move(resources));

    return last_pipeline;
}

代码语言:c 复制
首先这个逻辑时间QueryPlan(Tree Node) 转化为QueryPipelines 的过程
这个过程使用vector<Frame> Stack 实现, 从Frame{root Node}入栈开始, 到它出栈结束, 很像一次深度 按层次 读取过程

通过一个
select sum(l_orderkey) from lineitem, orders join l_orderkey = o_orderkey; 的简化的QueryPlanNode 来阐述这个过程
stack.top() 即拿去最后一个入栈的frame成员

准备
    QueryPipelineBuilderPtr last_pipeline;//注意last_pipeline 这里申明,最终返回
    std::stack<Frame> stack;
    stack.push(Frame{.node = root}); //root Node(agg)

循环1 
Agg

auto & frame = stack.top(); // frame = Frame{.node = root} root= Node(agg)
last_pipeline == nullptr
size_t next_child = frame.pipelines.size()  // next_child = 0
frame.node->children 只有一个Node(join), frame.node->children.size = 1
if (next_child == frame.node->children.size()) // 0 != 1 
stack.push(Frame{.node = frame.node->children[next_child]}); // Node(join) 进栈


循环2
Agg
Join

auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0 
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 0 !=2 
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan1)} 如栈


循环3 (转折点)
Agg
Join
Scan1

auto & frame = stack.top(); // frame = Frame{.node = Node(scan1)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0 
frame.node->children.size() // frame.node 为 Node(scan1) 没有children 0 
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
  bool limit_max_threads = frame.pipelines.empty();//0
  last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
  //注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan1) 可以理解为没有太多处理, 因为frame.pipelines 为空
  //即step Node(scan1)转换为last_pipeline 
  stack.pop(); //Node(scan1) 出栈
}


循环4
Agg
Join

auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan1) 转化而来 不做详解
frame.pipelines.emplace_back(std::move(last_pipeline)); //frame.pipelines 入栈 Node(scan1) pipeline 转折点
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1  
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 1 !=2 
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan2)} 入栈



循环5
Agg
Join
Scan2

auto & frame = stack.top(); // frame = Frame{.node = Node(scan2)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0 
frame.node->children.size() // frame.node 为 Node(scan2) 没有children 0 
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
  bool limit_max_threads = frame.pipelines.empty();//0
  last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
  //注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan2) 可以理解为没有太多处理, 因为frame.pipelines 为空
  //即step Node(scan2)转换为last_pipeline 
  stack.pop(); //Node(scan2) 出栈
}


循环6
Agg
Join

auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan2)
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 已经入栈过 Node(Scan1) pipeline 
//现在入栈 Node(scan2) pipeline , frame.pipelines.size == 2
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 2
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 2 == 2 转折点, 进入if
{
  bool limit_max_threads = frame.pipelines.empty();  //limit_max_threads = false
  last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
  //这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(scan1), Node(scan2)) updatePipline 为 last_pipeline , 不做详解
  stack.pop(); //Node(join) 出栈
}



循环7
Agg

auto & frame = stack.top(); // frame = Frame{.node = Node(Agg)}
last_pipeline // Node(join) 后面的last_pipeline 
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 当前为空, 入栈last_pipeline 
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1
frame.node->children.size() // frame.node 为 Node(Join) children 为1
if (next_child == frame.node->children.size()) // 1 == 1进入if
{
  bool limit_max_threads = frame.pipelines.empty();  //limit_max_threads = false
  last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
  //这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(Join)) updatePipline 为 last_pipeline , 不做详解
  stack.pop(); //Node(Agg) 出栈
}


推出循环
返回last_pipeline

总结

如最初第一章张图

0 人点赞