基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。
概述
第一篇我们就先从一个 SQL 的生命周期开始,从宏观上去看 CK 是如何处理 SQL 的。SQL 有 DML、DDL、DQL 多种语法,每种 SQL 的处理逻辑都不相同,但大体流程是类似的,我们主要以 InsertQuery、SelectQuery 两种 SQL 为例,窥探 SQL 在 CK 中的流转过程。本篇只是梳理整个 SQL 的调用流程,并不会细化分析一些细节实现(SQL 解析、查询计划生成、优化查询、表引擎的存储等),后续会具体分析。
逻辑梳理
从上面的流程图中可以看出,整个 SQL 处理大致干了两件事情:
SQL 执行(思维导图)
- 解析 SQL、构建 Pipeline;
- 执行 Pipeline;
解析 SQL、构建 Pipeline
这部分的核心处理逻辑在DB::executeQueryImpl()
方法中,首先会解析 SQL 拿到 AST,这是一个通用的处理逻辑。接下来就会根据不同的 AST,使用相应的 Interpreter 去构建 Pipeline。
执行 Pipeline
在执行 Pipeline 时,会根据 pipeline 是 Push/Pull 模式分别处理不同的 SQL 语句。
以 OrdinaryQuery(查询语句...) 为例,PullingAsyncPipelineExecutor::pull(Block & block)
方法会通过线程池,异步执行 Pipeline。
在PipelineExecutor::executeImpl(num_threads)
方法中首先会调用PipelineExecutor::initializeExecution()
初始化 ExecuteGraph 节点状态,并将可执行节点放入队列中等待线程处理。
在PipelineExecutor::executeStepImpl()
方法中,会通过ExecutionThreadContext::executeTask()
执行 IProcessor::work()
方法完成数据处理,并且通过graph->updateNode()
方法更新 ExecuteGraph 节点状态,尝试执行相邻的 Processor。
源码解析
对 Pipeline 机制不了解的可以参考这篇博客:ClickHouse和他的朋友们(4)Pipeline处理器和调度器
SelectQuery 构建 QueryPipeline
QueryPlan::buildQueryPipeline()
方法会根据 QueryPlan 构建 QueryPipeline,我们以这样的一个 SelectQuery 为例:Select * From customer Limit 1
。
下图是 SQL 生成的 QueryPlan 中的算子,这些算子在初始化完成后会被添加到 Pipeline 中,在阅读源码的时候可以带着这样一个想法:QueryPipeline 是由多个算子对应的 Transformer 连接起来的。
Pipe 是 Pipeline 的一部分,Pipe 包含多个 Processor,Transformer 是 Processor 的子类实现。
QueryPlan::buildQueryPipeline()
源码:
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
const QueryPlanOptimizationSettings & optimization_settings,
const BuildQueryPipelineSettings & build_pipeline_settings)
{
...
struct Frame
{
Node * node = {};
QueryPipelineBuilders pipelines = {};
};
QueryPipelineBuilderPtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr; //-V1048
}
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
// 将当前算子对应的 Transformer 添加到 Pipeline.
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);
stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}
return last_pipeline;
}
该方法通过栈实现了 DFS 算法,算子不断入栈,终止条件是:当某个算子的所有 children 算子都完成 updatePipeline。算子的 updatePipeline 逻辑在IQueryPlanStep::updatePipeline()
方法,这是一个虚函数。
对于举例的 SQL 来说,updatePipeline
的顺序是:ReadFromMergeTree(ISourceStep) -> Limit(ITransformingStep) -> Expression(ITransformingStep)
。
第一个算子ReadFromMergeTree
会进入ISourceStep::updatePipeline()
方法。
ISourceStep::updatePipeline()
源码:
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
// 1.实例化一个 QueryPipelineBuilder,用于构键后续的 Pipeline.
auto pipeline = std::make_unique<QueryPipelineBuilder>();
// 2.将之前算子初始化的 pipeline 中的 processors 添加给当前算子的 pipeline.
QueryPipelineProcessorsCollector collector(*pipeline, this);
// 3.虚方法,调用对应算子的初始化 pipeline,后续会分析这个方法。
initializePipeline(*pipeline, settings);
// 4.收集 processor.
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
return pipeline;
}
代码语言:javascript复制在 QueryPipelineBuilder 中有一个 processors 变量,在每个 IXxxStep 中也保存了一份 processors,这个 processors 收集了每个算子 updatepipeline 时得到的 Processor。
ISourceStep.h
/// We collect processors got after pipeline transformation.
Processors processors;
后续的算子Limit、Expression
会进入ITransformingStep::updatePipeline()
方法。
ITransformingStep::updatePipeline()
源码:
QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings)
{
if (collect_processors)
{
QueryPipelineProcessorsCollector collector(*pipelines.front(), this);
// 1.虚方法,将当前算子对应的 transformer 添加到 pipeline.
transformPipeline(*pipelines.front(), settings);
// 2.收集 processor.
processors = collector.detachProcessors();
}
else
transformPipeline(*pipelines.front(), settings);
return std::move(pipelines.front());
}
通过上面流程就可以根据 QueryPlan 构建出一个完整的 QueryPipeline(也就是最后返回的 last_pipeline)。
第一个算子的updatePipeline()
方法中会调用initializePipeline()
,后续的算子的updatePipeline()
方法会调用transformPipeline()
方法。想想也合理,第一个算子初始化 Pipeline,后续算子只需要把 Transformer 往 Pipeline 里面拼接就行了。
以ReadFromMergeTree
这个 Storage 为例,它会在它的initializePipeline()
实现方法中,调用Pipe ReadFromMergeTree::read()
方法,在进一步调用ReadFromMergeTree::readInOrder()
方法。
ReadFromMergeTree::readInOrder()
源码(省略部分代码):
Pipe ReadFromMergeTree::readInOrder(
RangesInDataParts parts_with_range,
Names required_columns,
ReadType read_type,
bool use_uncompressed_cache,
UInt64 limit)
{
Pipes pipes;
for (const auto & part : parts_with_range)
{
auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
: createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);
pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
return pipe;
}
在该方法中,会将MergeTreeInOrderSelectProcessor
这个实现了 IProcessor 接口的 ISource 放入 Pipe 中,然后返回。紧接着会调用到第二个算子LimitStep
,我们直接来到它的transformPipeline()
方法的具体实现。
LimitStep::transformPipeline()
源码:
void LimitStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto transform = std::make_shared<LimitTransform>(
pipeline.getHeader(), limit, offset, pipeline.getNumStreams(), always_read_till_end, with_ties, description);
pipeline.addTransform(std::move(transform));
}
介个方法灰常简单,就是创建了算子对应的 Transformer[LimitTransform
],然后把它加入到 Pipeline 就完事了。
至此,一个 SelectQuery 的 QueryPipeline 就构建出来了,可能看到这里对于 CK Pipeline 的 Pull/Push 模式有点懵,下一篇进一步学习 QueryPipeline 是如何在 ClickHouse 内部被调度运转的,它的数据是如何流动的。