ClickHouse 源码解析(一):SQL 的一生(上)

2022-10-28 11:37:07 浏览数 (1)

基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。

概述

第一篇我们就先从一个 SQL 的生命周期开始,从宏观上去看 CK 是如何处理 SQL 的。SQL 有 DML、DDL、DQL 多种语法,每种 SQL 的处理逻辑都不相同,但大体流程是类似的,我们主要以 InsertQuery、SelectQuery 两种 SQL 为例,窥探 SQL 在 CK 中的流转过程。本篇只是梳理整个 SQL 的调用流程,并不会细化分析一些细节实现(SQL 解析、查询计划生成、优化查询、表引擎的存储等),后续会具体分析。

逻辑梳理

从上面的流程图中可以看出,整个 SQL 处理大致干了两件事情:

SQL 执行(思维导图)

  1. 解析 SQL、构建 Pipeline;
  2. 执行 Pipeline;

解析 SQL、构建 Pipeline

这部分的核心处理逻辑在DB::executeQueryImpl()方法中,首先会解析 SQL 拿到 AST,这是一个通用的处理逻辑。接下来就会根据不同的 AST,使用相应的 Interpreter 去构建 Pipeline。

执行 Pipeline

在执行 Pipeline 时,会根据 pipeline 是 Push/Pull 模式分别处理不同的 SQL 语句。

以 OrdinaryQuery(查询语句...) 为例,PullingAsyncPipelineExecutor::pull(Block & block)方法会通过线程池,异步执行 Pipeline。

PipelineExecutor::executeImpl(num_threads)方法中首先会调用PipelineExecutor::initializeExecution()初始化 ExecuteGraph 节点状态,并将可执行节点放入队列中等待线程处理。

PipelineExecutor::executeStepImpl()方法中,会通过ExecutionThreadContext::executeTask()执行 IProcessor::work()方法完成数据处理,并且通过graph->updateNode()方法更新 ExecuteGraph 节点状态,尝试执行相邻的 Processor。

源码解析

对 Pipeline 机制不了解的可以参考这篇博客:ClickHouse和他的朋友们(4)Pipeline处理器和调度器

SelectQuery 构建 QueryPipeline

QueryPlan::buildQueryPipeline()方法会根据 QueryPlan 构建 QueryPipeline,我们以这样的一个 SelectQuery 为例:Select * From customer Limit 1

下图是 SQL 生成的 QueryPlan 中的算子,这些算子在初始化完成后会被添加到 Pipeline 中,在阅读源码的时候可以带着这样一个想法:QueryPipeline 是由多个算子对应的 Transformer 连接起来的

Pipe 是 Pipeline 的一部分,Pipe 包含多个 Processor,Transformer 是 Processor 的子类实现。

QueryPlan::buildQueryPipeline()源码:

代码语言:javascript复制
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
    const QueryPlanOptimizationSettings & optimization_settings,
    const BuildQueryPipelineSettings & build_pipeline_settings)
{
    ...
    
    struct Frame
    {
        Node * node = {};
        QueryPipelineBuilders pipelines = {};
    };

    QueryPipelineBuilderPtr last_pipeline;

    std::stack<Frame> stack;
    stack.push(Frame{.node = root});

    while (!stack.empty())
    {
        auto & frame = stack.top();

        if (last_pipeline)
        {
            frame.pipelines.emplace_back(std::move(last_pipeline));
            last_pipeline = nullptr; //-V1048
        }

        size_t next_child = frame.pipelines.size();
        if (next_child == frame.node->children.size())
        {
            bool limit_max_threads = frame.pipelines.empty();
            // 将当前算子对应的 Transformer 添加到 Pipeline.
            last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);

            if (limit_max_threads && max_threads)
                last_pipeline->limitMaxThreads(max_threads);

            stack.pop();
        }
        else
            stack.push(Frame{.node = frame.node->children[next_child]});
    }

    return last_pipeline;
}

该方法通过栈实现了 DFS 算法,算子不断入栈,终止条件是:当某个算子的所有 children 算子都完成 updatePipeline算子的 updatePipeline 逻辑在IQueryPlanStep::updatePipeline()方法,这是一个虚函数。

对于举例的 SQL 来说,updatePipeline的顺序是:ReadFromMergeTree(ISourceStep) -> Limit(ITransformingStep) -> Expression(ITransformingStep)

第一个算子ReadFromMergeTree会进入ISourceStep::updatePipeline()方法。

ISourceStep::updatePipeline()源码:

代码语言:javascript复制
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
    // 1.实例化一个 QueryPipelineBuilder,用于构键后续的 Pipeline.
    auto pipeline = std::make_unique<QueryPipelineBuilder>();
    // 2.将之前算子初始化的 pipeline 中的 processors 添加给当前算子的 pipeline.
    QueryPipelineProcessorsCollector collector(*pipeline, this);
    // 3.虚方法,调用对应算子的初始化 pipeline,后续会分析这个方法。
    initializePipeline(*pipeline, settings);
    // 4.收集 processor.
    auto added_processors = collector.detachProcessors();
    processors.insert(processors.end(), added_processors.begin(), added_processors.end());
    return pipeline;
}

在 QueryPipelineBuilder 中有一个 processors 变量,在每个 IXxxStep 中也保存了一份 processors,这个 processors 收集了每个算子 updatepipeline 时得到的 Processor。

代码语言:javascript复制
ISourceStep.h
/// We collect processors got after pipeline transformation.
Processors processors;

后续的算子Limit、Expression会进入ITransformingStep::updatePipeline()方法。

ITransformingStep::updatePipeline()源码:

代码语言:c复制
QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings)
{
    if (collect_processors)
    {
        QueryPipelineProcessorsCollector collector(*pipelines.front(), this);
        // 1.虚方法,将当前算子对应的 transformer 添加到 pipeline.
        transformPipeline(*pipelines.front(), settings);
        // 2.收集 processor.
        processors = collector.detachProcessors();
    }
    else
        transformPipeline(*pipelines.front(), settings);

    return std::move(pipelines.front());
}

通过上面流程就可以根据 QueryPlan 构建出一个完整的 QueryPipeline(也就是最后返回的 last_pipeline)。

第一个算子的updatePipeline()方法中会调用initializePipeline(),后续的算子的updatePipeline()方法会调用transformPipeline()方法。想想也合理,第一个算子初始化 Pipeline,后续算子只需要把 Transformer 往 Pipeline 里面拼接就行了。

ReadFromMergeTree这个 Storage 为例,它会在它的initializePipeline()实现方法中,调用Pipe ReadFromMergeTree::read()方法,在进一步调用ReadFromMergeTree::readInOrder()方法。

ReadFromMergeTree::readInOrder()源码(省略部分代码):

代码语言:javascript复制
Pipe ReadFromMergeTree::readInOrder(
    RangesInDataParts parts_with_range,
    Names required_columns,
    ReadType read_type,
    bool use_uncompressed_cache,
    UInt64 limit)
{
    Pipes pipes;

    for (const auto & part : parts_with_range)
    {
        auto source = read_type == ReadType::InReverseOrder
                    ? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
                    : createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);

        pipes.emplace_back(std::move(source));
    }

    auto pipe = Pipe::unitePipes(std::move(pipes));

    return pipe;
}

在该方法中,会将MergeTreeInOrderSelectProcessor这个实现了 IProcessor 接口的 ISource 放入 Pipe 中,然后返回。紧接着会调用到第二个算子LimitStep,我们直接来到它的transformPipeline()方法的具体实现。

LimitStep::transformPipeline()源码:

代码语言:javascript复制
void LimitStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
    auto transform = std::make_shared<LimitTransform>(
        pipeline.getHeader(), limit, offset, pipeline.getNumStreams(), always_read_till_end, with_ties, description);

    pipeline.addTransform(std::move(transform));
}

介个方法灰常简单,就是创建了算子对应的 Transformer[LimitTransform],然后把它加入到 Pipeline 就完事了。

至此,一个 SelectQuery 的 QueryPipeline 就构建出来了,可能看到这里对于 CK Pipeline 的 Pull/Push 模式有点懵,下一篇进一步学习 QueryPipeline 是如何在 ClickHouse 内部被调度运转的,它的数据是如何流动的。

0 人点赞