一 QueryPlan::buildQueryPipeline
pipe 执行逻辑如下
二 基础知识
QueryPlanStep
代码语言:javascript复制 ^IQueryPlanStep$
└── IQueryPlanStep [vim src/Processors/QueryPlan/IQueryPlanStep.h 70]
├── CreatingSetsStep [vim src/Processors/QueryPlan/CreatingSetsStep.h 35]
├── IntersectOrExceptStep [vim src/Processors/QueryPlan/IntersectOrExceptStep.h 9]
├── JoinStep [vim src/Processors/QueryPlan/JoinStep.h 12]
├── UnionStep [vim src/Processors/QueryPlan/UnionStep.h 8]
├── ITransformingStep [vim src/Processors/QueryPlan/ITransformingStep.h 9]
│ ├── AggregatingStep [vim src/Processors/QueryPlan/AggregatingStep.h 14]
│ ├── ArrayJoinStep [vim src/Processors/QueryPlan/ArrayJoinStep.h 10]
│ ├── CreatingSetStep [vim src/Processors/QueryPlan/CreatingSetsStep.h 12]
│ ├── CubeStep [vim src/Processors/QueryPlan/CubeStep.h 13]
│ ├── DistinctStep [vim src/Processors/QueryPlan/DistinctStep.h 9]
│ ├── ExpressionStep [vim src/Processors/QueryPlan/ExpressionStep.h 14]
│ ├── ExtremesStep [vim src/Processors/QueryPlan/ExtremesStep.h 7]
│ ├── FilledJoinStep [vim src/Processors/QueryPlan/JoinStep.h 37]
│ ├── FillingStep [vim src/Processors/QueryPlan/FillingStep.h 9]
│ ├── FilterStep [vim src/Processors/QueryPlan/FilterStep.h 11]
│ ├── LimitByStep [vim src/Processors/QueryPlan/LimitByStep.h 8]
│ ├── LimitStep [vim src/Processors/QueryPlan/LimitStep.h 9]
│ ├── MergingAggregatedStep [vim src/Processors/QueryPlan/MergingAggregatedStep.h 12]
│ ├── OffsetStep [vim src/Processors/QueryPlan/OffsetStep.h 9]
│ ├── RollupStep [vim src/Processors/QueryPlan/RollupStep.h 12]
│ ├── SettingQuotaAndLimitsStep [vim src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h 21]
│ ├── SortingStep [vim src/Processors/QueryPlan/SortingStep.h 11]
│ ├── TotalsHavingStep [vim src/Processors/QueryPlan/TotalsHavingStep.h 13]
│ └── WindowStep [vim src/Processors/QueryPlan/WindowStep.h 14]
└── ISourceStep [vim src/Processors/QueryPlan/ISourceStep.h 8]
├── ReadFromMergeTree [vim src/Processors/QueryPlan/ReadFromMergeTree.h 26]
├── ReadFromRemote [vim src/Processors/QueryPlan/ReadFromRemote.h 21]
├── ReadNothingStep [vim src/Processors/QueryPlan/ReadNothingStep.h 8]
└── ReadFromPreparedSource [vim src/Processors/QueryPlan/ReadFromPreparedSource.h 9]
└── ReadFromStorageStep [vim src/Processors/QueryPlan/ReadFromPreparedSource.h 23]
Processor
代码语言:javascript复制 ^IProcessor$
└── IProcessor [vim src/Processors/IProcessor.h 110]
├── AggregatingInOrderTransform [vim src/Processors/Transforms/AggregatingInOrderTransform.h 19]
├── AggregatingTransform [vim src/Processors/Transforms/AggregatingTransform.h 102]
├── ConcatProcessor [vim src/Processors/ConcatProcessor.h 16]
├── ConvertingAggregatedToChunksTransform [vim src/Processors/Transforms/AggregatingTransform.cpp 154]
├── CopyTransform [vim src/Processors/Transforms/CopyTransform.h 9]
├── CopyingDataToViewsTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp 71]
├── DelayedPortsProcessor [vim src/Processors/DelayedPortsProcessor.h 11]
├── DelayedSource [vim src/Processors/Sources/DelayedSource.h 17]
├── FillingRightJoinSideTransform [vim src/Processors/Transforms/JoiningTransform.h 87]
├── FinalizingViewsTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp 148]
├── ForkProcessor [vim src/Processors/ForkProcessor.h 18]
├── GroupingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h 59]
├── IInflatingTransform [vim src/Processors/IInflatingTransform.h 21]
├── IntersectOrExceptTransform [vim src/Processors/Transforms/IntersectOrExceptTransform.h 12]
├── JoiningTransform [vim src/Processors/Transforms/JoiningTransform.h 18]
├── LimitTransform [vim src/Processors/LimitTransform.h 18]
├── OffsetTransform [vim src/Processors/OffsetTransform.h 13]
├── ResizeProcessor [vim src/Processors/ResizeProcessor.h 21]
├── SortingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h 121]
├── StrictResizeProcessor [vim src/Processors/ResizeProcessor.h 77]
├── WindowTransform [vim src/Processors/Transforms/WindowTransform.h 87]
├── IAccumulatingTransform [vim src/Processors/IAccumulatingTransform.h 13]
│ ├── BufferingToFileTransform [vim src/Processors/Transforms/MergeSortingTransform.cpp 30]
│ ├── CreatingSetsTransform [vim src/Processors/Transforms/CreatingSetsTransform.h 26]
│ ├── CubeTransform [vim src/Processors/Transforms/CubeTransform.h 11]
│ ├── MergingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedTransform.h 12]
│ ├── QueueBuffer [vim src/Processors/QueueBuffer.h 13]
│ ├── RollupTransform [vim src/Processors/Transforms/RollupTransform.h 10]
│ ├── TTLCalcTransform [vim src/Processors/Transforms/TTLCalcTransform.h 14]
│ └── TTLTransform [vim src/Processors/Transforms/TTLTransform.h 15]
├── ISimpleTransform [vim src/Processors/ISimpleTransform.h 17]
│ ├── AddingDefaultsTransform [vim src/Processors/Transforms/AddingDefaultsTransform.h 13]
│ ├── AddingSelectorTransform [vim src/Processors/Transforms/AddingSelectorTransform.h 12]
│ ├── ArrayJoinTransform [vim src/Processors/Transforms/ArrayJoinTransform.h 11]
│ ├── CheckSortedTransform [vim src/Processors/Transforms/CheckSortedTransform.h 12]
│ ├── DistinctSortedTransform [vim src/Processors/Transforms/DistinctSortedTransform.h 21]
│ ├── DistinctTransform [vim src/Processors/Transforms/DistinctTransform.h 10]
│ ├── ExpressionTransform [vim src/Processors/Transforms/ExpressionTransform.h 18]
│ ├── ExtremesTransform [vim src/Processors/Transforms/ExtremesTransform.h 7]
│ ├── FillingTransform [vim src/Processors/Transforms/FillingTransform.h 13]
│ ├── FilterTransform [vim src/Processors/Transforms/FilterTransform.h 18]
│ ├── FinalizeAggregatedTransform [vim src/Processors/Transforms/AggregatingInOrderTransform.h 78]
│ ├── LimitByTransform [vim src/Processors/Transforms/LimitByTransform.h 10]
│ ├── LimitsCheckingTransform [vim src/Processors/Transforms/LimitsCheckingTransform.h 26]
│ ├── MaterializingTransform [vim src/Processors/Transforms/MaterializingTransform.h 8]
│ ├── MergingAggregatedBucketTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h 105]
│ ├── PartialSortingTransform [vim src/Processors/Transforms/PartialSortingTransform.h 13]
│ ├── ReplacingWindowColumnTransform [vim src/Processors/Transforms/ReplacingWindowColumnTransform.h 13]
│ ├── ReverseTransform [vim src/Processors/Transforms/ReverseTransform.h 8]
│ ├── SendingChunkHeaderTransform [vim src/Processors/Sources/ShellCommandSource.cpp 429]
│ ├── TotalsHavingTransform [vim src/Processors/Transforms/TotalsHavingTransform.h 23]
│ ├── TransformWithAdditionalColumns [vim src/Dictionaries/DictionarySourceHelpers.h 40]
│ └── WatermarkTransform [vim src/Processors/Transforms/WatermarkTransform.h 10]
├── ISink [vim src/Processors/ISink.h 9]
│ ├── EmptySink [vim src/Processors/Sinks/EmptySink.h 8]
│ ├── ExternalTableDataSink [vim src/Client/Connection.cpp 682]
│ ├── NullSink [vim src/Processors/Sinks/NullSink.h 8]
│ └── ODBCSink [vim programs/odbc-bridge/ODBCBlockOutputStream.h 14]
├── SortingTransform [vim src/Processors/Transforms/SortingTransform.h 63]
│ ├── FinishSortingTransform [vim src/Processors/Transforms/FinishSortingTransform.h 10]
│ └── MergeSortingTransform [vim src/Processors/Transforms/MergeSortingTransform.h 17]
├── IMergingTransformBase [vim src/Processors/Merges/IMergingTransform.h 12]
│ └── IMergingTransform [vim src/Processors/Merges/IMergingTransform.h 77]
│ ├── AggregatingSortedTransform [vim src/Processors/Merges/AggregatingSortedTransform.h 12]
│ ├── CollapsingSortedTransform [vim src/Processors/Merges/CollapsingSortedTransform.h 10]
│ ├── ColumnGathererTransform [vim src/Processors/Transforms/ColumnGathererTransform.h 105]
│ ├── FinishAggregatingInOrderTransform [vim src/Processors/Merges/FinishAggregatingInOrderTransform.h 12]
│ ├── GraphiteRollupSortedTransform [vim src/Processors/Merges/GraphiteRollupSortedTransform.h 10]
│ ├── MergingSortedTransform [vim src/Processors/Merges/MergingSortedTransform.h 11]
│ ├── ReplacingSortedTransform [vim src/Processors/Merges/ReplacingSortedTransform.h 11]
│ ├── SummingSortedTransform [vim src/Processors/Merges/SummingSortedTransform.h 10]
│ └── VersionedCollapsingTransform [vim src/Processors/Merges/VersionedCollapsingTransform.h 11]
├── ExceptionKeepingTransform [vim src/Processors/Transforms/ExceptionKeepingTransform.h 24]
│ ├── CheckConstraintsTransform [vim src/Processors/Transforms/CheckConstraintsTransform.h 15]
│ ├── ConvertingTransform [vim src/Processors/Transforms/ExpressionTransform.h 36]
│ ├── CountingTransform [vim src/Processors/Transforms/CountingTransform.h 14]
│ ├── ExecutingInnerQueryFromViewTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp 86]
│ ├── SquashingChunksTransform [vim src/Processors/Transforms/SquashingChunksTransform.h 9]
│ └── SinkToStorage [vim src/Processors/Sinks/SinkToStorage.h 9]
│ ├── BufferSink [vim src/Storages/StorageBuffer.cpp 535]
│ ├── DistributedSink [vim src/Storages/Distributed/DistributedSink.h 39]
│ ├── EmbeddedRocksDBSink [vim src/Storages/RocksDB/EmbeddedRocksDBSink.h 13]
│ ├── HDFSSink [vim src/Storages/HDFS/StorageHDFS.cpp 438]
│ ├── KafkaSink [vim src/Storages/Kafka/KafkaBlockOutputStream.h 12]
│ ├── LiveViewSink [vim src/Storages/LiveView/LiveViewSink.h 12]
│ ├── LogSink [vim src/Storages/StorageLog.cpp 262]
│ ├── MemorySink [vim src/Storages/StorageMemory.cpp 108]
│ ├── MergeTreeSink [vim src/Storages/MergeTree/MergeTreeSink.h 14]
│ ├── NullSinkToStorage [vim src/Processors/Sinks/SinkToStorage.h 36]
│ ├── PostgreSQLSink [vim src/Storages/StoragePostgreSQL.cpp 109]
│ ├── PushingToLiveViewSink [vim src/Processors/Transforms/buildPushingToViewsChain.cpp 117]
│ ├── PushingToWindowViewSink [vim src/Processors/Transforms/buildPushingToViewsChain.cpp 131]
│ ├── RabbitMQSink [vim src/Storages/RabbitMQ/RabbitMQSink.h 13]
│ ├── RemoteSink [vim src/Processors/Sinks/RemoteSink.h 8]
│ ├── ReplicatedMergeTreeSink [vim src/Storages/MergeTree/ReplicatedMergeTreeSink.h 22]
│ ├── SQLiteSink [vim src/Storages/StorageSQLite.cpp 87]
│ ├── SetOrJoinSink [vim src/Storages/StorageSet.cpp 37]
│ ├── StorageFileSink [vim src/Storages/StorageFile.cpp 676]
│ ├── StorageMySQLSink [vim src/Storages/StorageMySQL.cpp 114]
│ ├── StorageS3Sink [vim src/Storages/StorageS3.cpp 377]
│ ├── StorageURLSink [vim src/Storages/StorageURL.h 103]
│ ├── StripeLogSink [vim src/Storages/StorageStripeLog.cpp 166]
│ └── PartitionedSink [vim src/Storages/PartitionedSink.h 14]
│ ├── PartitionedHDFSSink [vim src/Storages/HDFS/StorageHDFS.cpp 480]
│ ├── PartitionedStorageFileSink [vim src/Storages/StorageFile.cpp 799]
│ ├── PartitionedStorageS3Sink [vim src/Storages/StorageS3.cpp 436]
│ └── PartitionedStorageURLSink [vim src/Storages/StorageURL.cpp 345]
├── IOutputFormat [vim src/Processors/Formats/IOutputFormat.h 23]
│ ├── ArrowBlockOutputFormat [vim src/Processors/Formats/Impl/ArrowBlockOutputFormat.h 18]
│ ├── LazyOutputFormat [vim src/Processors/Formats/LazyOutputFormat.h 13]
│ ├── MySQLOutputFormat [vim src/Processors/Formats/Impl/MySQLOutputFormat.h 20]
│ ├── NativeOutputFormat [vim src/Processors/Formats/Impl/NativeFormat.cpp 55]
│ ├── NullOutputFormat [vim src/Processors/Formats/Impl/NullFormat.h 7]
│ ├── ODBCDriver2BlockOutputFormat [vim src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h 20]
│ ├── ORCBlockOutputFormat [vim src/Processors/Formats/Impl/ORCBlockOutputFormat.h 34]
│ ├── ParallelFormattingOutputFormat [vim src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h 54]
│ ├── ParquetBlockOutputFormat [vim src/Processors/Formats/Impl/ParquetBlockOutputFormat.h 27]
│ ├── PostgreSQLOutputFormat [vim src/Processors/Formats/Impl/PostgreSQLOutputFormat.h 13]
│ ├── PullingOutputFormat [vim src/Processors/Formats/PullingOutputFormat.h 9]
│ ├── TemplateBlockOutputFormat [vim src/Processors/Formats/Impl/TemplateBlockOutputFormat.h 13]
│ ├── PrettyBlockOutputFormat [vim src/Processors/Formats/Impl/PrettyBlockOutputFormat.h 17]
│ │ ├── PrettyCompactBlockOutputFormat [vim src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h 13]
│ │ └── PrettySpaceBlockOutputFormat [vim src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h 11]
│ └── IRowOutputFormat [vim src/Processors/Formats/IRowOutputFormat.h 24]
│ ├── AvroRowOutputFormat [vim src/Processors/Formats/Impl/AvroRowOutputFormat.h 46]
│ ├── BinaryRowOutputFormat [vim src/Processors/Formats/Impl/BinaryRowOutputFormat.h 17]
│ ├── CSVRowOutputFormat [vim src/Processors/Formats/Impl/CSVRowOutputFormat.h 17]
│ ├── CapnProtoRowOutputFormat [vim src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h 26]
│ ├── CustomSeparatedRowOutputFormat [vim src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.h 11]
│ ├── JSONCompactEachRowRowOutputFormat [vim src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h 15]
│ ├── MarkdownRowOutputFormat [vim src/Processors/Formats/Impl/MarkdownRowOutputFormat.h 12]
│ ├── MsgPackRowOutputFormat [vim src/Processors/Formats/Impl/MsgPackRowOutputFormat.h 18]
│ ├── ProtobufRowOutputFormat [vim src/Processors/Formats/Impl/ProtobufRowOutputFormat.h 29]
│ ├── RawBLOBRowOutputFormat [vim src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h 27]
│ ├── ValuesRowOutputFormat [vim src/Processors/Formats/Impl/ValuesRowOutputFormat.h 15]
│ ├── VerticalRowOutputFormat [vim src/Processors/Formats/Impl/VerticalRowOutputFormat.h 18]
│ ├── XMLRowOutputFormat [vim src/Processors/Formats/Impl/XMLRowOutputFormat.h 16]
│ ├── JSONEachRowRowOutputFormat [vim src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h 15]
│ │ └── JSONEachRowWithProgressRowOutputFormat [vim src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h 8]
│ ├── JSONRowOutputFormat [vim src/Processors/Formats/Impl/JSONRowOutputFormat.h 16]
│ │ └── JSONCompactRowOutputFormat [vim src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h 16]
│ └── TabSeparatedRowOutputFormat [vim src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h 15]
│ └── TSKVRowOutputFormat [vim src/Processors/Formats/Impl/TSKVRowOutputFormat.h 14]
└── ISource [vim src/Processors/ISource.h 9]
├── ConvertingAggregatedToChunksSource [vim src/Processors/Transforms/AggregatingTransform.cpp 88]
├── MergeSorterSource [vim src/Processors/Transforms/SortingTransform.h 45]
├── NullSource [vim src/Processors/Sources/NullSource.h 8]
├── ODBCSource [vim programs/odbc-bridge/ODBCBlockInputStream.h 13]
├── PushingAsyncSource [vim src/Processors/Executors/PushingAsyncPipelineExecutor.cpp 17]
├── PushingSource [vim src/Processors/Executors/PushingPipelineExecutor.cpp 15]
├── RemoteExtremesSource [vim src/Processors/Sources/RemoteSource.h 70]
├── RemoteTotalsSource [vim src/Processors/Sources/RemoteSource.h 54]
├── SourceFromNativeStream [vim src/Processors/Transforms/AggregatingTransform.cpp 53]
├── TemporaryFileLazySource [vim src/Processors/Sources/TemporaryFileLazySource.h 10]
├── WaitForAsyncInsertSource [vim src/Processors/Sources/WaitForAsyncInsertSource.h 11]
├── ISourceWithProgress [vim src/Processors/Sources/SourceWithProgress.h 16]
│ └── SourceWithProgress [vim src/Processors/Sources/SourceWithProgress.h 48]
│ ├── BlocksListSource [vim src/Processors/Sources/BlocksListSource.h 12]
│ ├── BlocksSource [vim src/Processors/Sources/BlocksSource.h 23]
│ ├── BufferSource [vim src/Storages/StorageBuffer.cpp 146]
│ ├── CassandraSource [vim src/Dictionaries/CassandraSource.h 14]
│ ├── ColumnsSource [vim src/Storages/System/StorageSystemColumns.cpp 67]
│ ├── DDLQueryStatusSource [vim src/Interpreters/executeDDLQueryOnCluster.cpp 174]
│ ├── DataSkippingIndicesSource [vim src/Storages/System/StorageSystemDataSkippingIndices.cpp 35]
│ ├── DictionarySource [vim src/Dictionaries/DictionarySource.cpp 14]
│ ├── DirectoryMonitorSource [vim src/Storages/Distributed/DirectoryMonitor.cpp 908]
│ ├── EmbeddedRocksDBSource [vim src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp 180]
│ ├── FileLogSource [vim src/Storages/FileLog/FileLogSource.h 13]
│ ├── GenerateSource [vim src/Storages/StorageGenerateRandom.cpp 379]
│ ├── HDFSSource [vim src/Storages/HDFS/StorageHDFS.h 85]
│ ├── JoinSource [vim src/Storages/StorageJoin.cpp 380]
│ ├── KafkaSource [vim src/Storages/Kafka/KafkaSource.h 16]
│ ├── LiveViewEventsSource [vim src/Storages/LiveView/LiveViewEventsSource.h 30]
│ ├── LiveViewSource [vim src/Storages/LiveView/LiveViewSource.h 14]
│ ├── LogSource [vim src/Storages/StorageLog.cpp 57]
│ ├── MemorySource [vim src/Storages/StorageMemory.cpp 30]
│ ├── MergeTreeSequentialSource [vim src/Storages/MergeTree/MergeTreeSequentialSource.h 12]
│ ├── MongoDBSource [vim src/Processors/Transforms/MongoDBSource.h 25]
│ ├── NumbersMultiThreadedSource [vim src/Storages/System/StorageSystemNumbers.cpp 64]
│ ├── NumbersSource [vim src/Storages/System/StorageSystemNumbers.cpp 17]
│ ├── RabbitMQSource [vim src/Storages/RabbitMQ/RabbitMQSource.h 11]
│ ├── RedisSource [vim src/Dictionaries/RedisSource.h 22]
│ ├── RemoteSource [vim src/Processors/Sources/RemoteSource.h 17]
│ ├── SQLiteSource [vim src/Processors/Sources/SQLiteSource.h 15]
│ ├── ShellCommandSource [vim src/Processors/Sources/ShellCommandSource.cpp 247]
│ ├── SourceFromSingleChunk [vim src/Processors/Sources/SourceFromSingleChunk.h 8]
│ ├── StorageFileSource [vim src/Storages/StorageFile.cpp 413]
│ ├── StorageHiveSource [vim src/Storages/Hive/StorageHive.cpp 56]
│ ├── StorageInputSource [vim src/Storages/StorageInput.cpp 28]
│ ├── StorageS3Source [vim src/Storages/StorageS3.h 33]
│ ├── StorageURLSource [vim src/Storages/StorageURL.cpp 115]
│ ├── StripeLogSource [vim src/Storages/StorageStripeLog.cpp 62]
│ ├── SyncKillQuerySource [vim src/Interpreters/InterpreterKillQueryQuery.cpp 127]
│ ├── TablesBlockSource [vim src/Storages/System/StorageSystemTables.cpp 127]
│ ├── WindowViewSource [vim src/Storages/WindowView/WindowViewSource.h 10]
│ ├── ZerosSource [vim src/Storages/System/StorageSystemZeros.cpp 26]
│ ├── MySQLSource [vim src/Processors/Sources/MySQLSource.h 28]
│ │ └── MySQLWithFailoverSource [vim src/Processors/Sources/MySQLSource.h 64]
│ ├── PostgreSQLSource [vim src/Processors/Transforms/PostgreSQLSource.h 19]
│ │ └── PostgreSQLTransactionSource [vim src/Processors/Transforms/PostgreSQLSource.h 66]
│ └── MergeTreeBaseSelectProcessor [vim src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h 32]
│ ├── MergeTreeThreadSelectProcessor [vim src/Storages/MergeTree/MergeTreeThreadSelectProcessor.h 14]
│ └── MergeTreeSelectProcessor [vim src/Storages/MergeTree/MergeTreeSelectProcessor.h 16]
│ ├── MergeTreeInOrderSelectProcessor [vim src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h 11]
│ └── MergeTreeReverseSelectProcessor [vim src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h 12]
└── IInputFormat [vim src/Processors/Formats/IInputFormat.h 30]
├── ArrowBlockInputFormat [vim src/Processors/Formats/Impl/ArrowBlockInputFormat.h 19]
├── NativeInputFormat [vim src/Processors/Formats/Impl/NativeFormat.cpp 15]
├── ORCBlockInputFormat [vim src/Processors/Formats/Impl/ORCBlockInputFormat.h 21]
├── ParallelParsingInputFormat [vim src/Processors/Formats/Impl/ParallelParsingInputFormat.h 70]
├── ParquetBlockInputFormat [vim src/Processors/Formats/Impl/ParquetBlockInputFormat.h 18]
├── ValuesBlockInputFormat [vim src/Processors/Formats/Impl/ValuesBlockInputFormat.h 20]
└── IRowInputFormat [vim src/Processors/Formats/IRowInputFormat.h 38]
├── AvroConfluentRowInputFormat [vim src/Processors/Formats/Impl/AvroRowInputFormat.h 140]
├── AvroRowInputFormat [vim src/Processors/Formats/Impl/AvroRowInputFormat.h 118]
├── CapnProtoRowInputFormat [vim src/Processors/Formats/Impl/CapnProtoRowInputFormat.h 23]
├── JSONEachRowRowInputFormat [vim src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h 21]
├── LineAsStringRowInputFormat [vim src/Processors/Formats/Impl/LineAsStringRowInputFormat.h 17]
├── MsgPackRowInputFormat [vim src/Processors/Formats/Impl/MsgPackRowInputFormat.h 59]
├── ProtobufRowInputFormat [vim src/Processors/Formats/Impl/ProtobufRowInputFormat.h 29]
├── RawBLOBRowInputFormat [vim src/Processors/Formats/Impl/RawBLOBRowInputFormat.h 16]
├── RegexpRowInputFormat [vim src/Processors/Formats/Impl/RegexpRowInputFormat.h 51]
├── TSKVRowInputFormat [vim src/Processors/Formats/Impl/TSKVRowInputFormat.h 24]
├── JSONAsRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h 15]
│ ├── JSONAsObjectRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h 56]
│ └── JSONAsStringRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h 43]
└── RowInputFormatWithDiagnosticInfo [vim src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h 12]
├── TemplateRowInputFormat [vim src/Processors/Formats/Impl/TemplateRowInputFormat.h 18]
└── RowInputFormatWithNamesAndTypes [vim src/Processors/Formats/RowInputFormatWithNamesAndTypes.h 24]
├── BinaryRowInputFormat [vim src/Processors/Formats/Impl/BinaryRowInputFormat.h 20]
├── CustomSeparatedRowInputFormat [vim src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.h 11]
├── JSONCompactEachRowRowInputFormat [vim src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h 22]
├── TabSeparatedRowInputFormat [vim src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h 14]
└── CSVRowInputFormat [vim src/Processors/Formats/Impl/CSVRowInputFormat.h 18]
└── HiveTextRowInputFormat [vim src/Processors/Formats/Impl/HiveTextRowInputFormat.h 15]
三 精讲
四 关系图
参考https://bbs.huaweicloud.com/blogs/314808
五 详解
https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/QueryPlan.cpp#L156
代码语言:javascript复制QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
const QueryPlanOptimizationSettings & optimization_settings,
const BuildQueryPipelineSettings & build_pipeline_settings)
{
checkInitialized();
optimize(optimization_settings);
struct Frame
{
Node * node = {};
QueryPipelineBuilders pipelines = {};
};
QueryPipelineBuilderPtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr;
}
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);
stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources));
return last_pipeline;
}
代码语言:c 复制首先这个逻辑时间QueryPlan(Tree Node) 转化为QueryPipelines 的过程
这个过程使用vector<Frame> Stack 实现, 从Frame{root Node}入栈开始, 到它出栈结束, 很像一次深度 按层次 读取过程
通过一个
select sum(l_orderkey) from lineitem, orders join l_orderkey = o_orderkey; 的简化的QueryPlanNode 来阐述这个过程
stack.top() 即拿去最后一个入栈的frame成员
准备
QueryPipelineBuilderPtr last_pipeline;//注意last_pipeline 这里申明,最终返回
std::stack<Frame> stack;
stack.push(Frame{.node = root}); //root Node(agg)
循环1
Agg
auto & frame = stack.top(); // frame = Frame{.node = root} root= Node(agg)
last_pipeline == nullptr
size_t next_child = frame.pipelines.size() // next_child = 0
frame.node->children 只有一个Node(join), frame.node->children.size = 1
if (next_child == frame.node->children.size()) // 0 != 1
stack.push(Frame{.node = frame.node->children[next_child]}); // Node(join) 进栈
循环2
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 0 !=2
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan1)} 如栈
循环3 (转折点)
Agg
Join
Scan1
auto & frame = stack.top(); // frame = Frame{.node = Node(scan1)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(scan1) 没有children 0
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
bool limit_max_threads = frame.pipelines.empty();//0
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan1) 可以理解为没有太多处理, 因为frame.pipelines 为空
//即step Node(scan1)转换为last_pipeline
stack.pop(); //Node(scan1) 出栈
}
循环4
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan1) 转化而来 不做详解
frame.pipelines.emplace_back(std::move(last_pipeline)); //frame.pipelines 入栈 Node(scan1) pipeline 转折点
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 1 !=2
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan2)} 入栈
循环5
Agg
Join
Scan2
auto & frame = stack.top(); // frame = Frame{.node = Node(scan2)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(scan2) 没有children 0
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
bool limit_max_threads = frame.pipelines.empty();//0
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan2) 可以理解为没有太多处理, 因为frame.pipelines 为空
//即step Node(scan2)转换为last_pipeline
stack.pop(); //Node(scan2) 出栈
}
循环6
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan2)
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 已经入栈过 Node(Scan1) pipeline
//现在入栈 Node(scan2) pipeline , frame.pipelines.size == 2
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 2
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 2 == 2 转折点, 进入if
{
bool limit_max_threads = frame.pipelines.empty(); //limit_max_threads = false
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(scan1), Node(scan2)) updatePipline 为 last_pipeline , 不做详解
stack.pop(); //Node(join) 出栈
}
循环7
Agg
auto & frame = stack.top(); // frame = Frame{.node = Node(Agg)}
last_pipeline // Node(join) 后面的last_pipeline
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 当前为空, 入栈last_pipeline
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1
frame.node->children.size() // frame.node 为 Node(Join) children 为1
if (next_child == frame.node->children.size()) // 1 == 1进入if
{
bool limit_max_threads = frame.pipelines.empty(); //limit_max_threads = false
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(Join)) updatePipline 为 last_pipeline , 不做详解
stack.pop(); //Node(Agg) 出栈
}
推出循环
返回last_pipeline
总结
如最初第一章张图