ClickHouse Lambda 组合拳

2022-03-08 20:32:57 浏览数 (2)

本文主要讲解 ClickHouse 对 Lambda 表达式 使用学习

Cpp reference

https://en.cppreference.com/w/cpp/language/lambda

ClickHouse Lambda

1 定义 结构定义 FormatFactory

代码语言:c 复制
using InputFormatPtr = std::shared_ptr<IInputFormat>;
代码语言:c 复制
    //Lambda 表达式, 类型定义
    using InputCreator = std::function<InputFormatPtr(
            ReadBuffer & buf,
            const Block & header,
            const RowInputFormatParams & params,
            const FormatSettings & settings)>;

    using OutputCreator = std::function<OutputFormatPtr(
            WriteBuffer & buf,
            const Block & sample,
            const RowOutputFormatParams & params,
            const FormatSettings & settings)>;

    //数据结构定义
    struct Creators
    {
        //类型变量定义 
        InputCreator input_creator;
        OutputCreator output_creator;
        FileSegmentationEngine file_segmentation_engine;
        SchemaReaderCreator schema_reader_creator;
        ExternalSchemaReaderCreator external_schema_reader_creator;
        bool supports_parallel_formatting{false};
        bool is_column_oriented{false};
        NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
        AppendSupportChecker append_support_checker;
    };

    //结构MAP 丰富
    using FormatsDictionary = std::unordered_map<String, Creators>;

    //成员变量
    FormatsDictionary dict;

    //提供注册函数
    void registerInputFormat(const String & name, InputCreator input_creator);
    void registerOutputFormat(const String & name, OutputCreator output_creator);

2 FormatFactory 注册借口实现 dict 成员变量注册
代码语言:c 复制
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
    auto & target = dict[name].input_creator;
    if (target)
        throw Exception("FormatFactory: Input format "   name   " is already registered", ErrorCodes::LOGICAL_ERROR);
    target = std::move(input_creator);
    registerFileExtension(name, name);
}

3 调用注册接口 CSVRowInputFormat.h

代码语言:c 复制
void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWithNamesAndTypesFunc register_func)
{
    //去掉用 register_func 
    register_func(base_format_name, false, false);
    register_func(base_format_name   "WithNames", true, false);
    register_func(base_format_name   "WithNamesAndTypes", true, true);
}

void registerInputFormatCSV(FormatFactory & factory)
{
    //lamda 注册初始化函数 内部注册
    auto register_func = [&](const String & format_name, bool with_names, bool with_types)
    {
        factory.registerInputFormat(format_name, [with_names, with_types](
            ReadBuffer & buf,
            const Block & sample,
            IRowInputFormat::Params params,
            const FormatSettings & settings)
        {
            //这里极为 lamda 表达式 最终使用 == InputCreator
            return std::make_shared<CSVRowInputFormat>(sample, buf, std::move(params), with_names, with_types, settings);
        });
    };
    //这里是 CSV 传递参
    registerWithNamesAndTypes("CSV", register_func);
}

4 CSVRowInputFormat 定义

代码语言:c 复制
class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
{
public:
    /** with_names - in the first line the header with column names
      * with_types - on the next line header with type names
      */
    CSVRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
                      bool with_names_, bool with_types_, const FormatSettings & format_settings_);

5 registerInputFomat factory 参数哪里来的

代码语言:c 复制
FormatFactory & FormatFactory::instance()
{
    static FormatFactory ret;
    return ret;
}

- main 函数调用

代码语言:c 复制
int Server::main(const std::vector<std::string> & /*args*/)
{
    Poco::Logger * log = &logger();

    UseSSL use_ssl;

    MainThreadStatus::getInstance();

    registerFunctions();
    registerAggregateFunctions();
    registerTableFunctions();
    registerStorages();
    registerDictionaries();
    registerDisks();
    registerFormats();
    registerRemoteFileMetadatas();

  • 调用 registreInputFomat
FormatFormat

6 Lambda 变形 ParallelParsingInputFormat.h

代码语言:c 复制
    using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;

    struct Params
    {
        ReadBuffer & in;
        Block header;
        InternalParserCreator internal_parser_creator;
        FormatFactory::FileSegmentationEngine file_segmentation_engine;
        String format_name;
        size_t max_threads;
        size_t min_chunk_bytes;
        bool is_server;
    };

    //成员变量
    const InternalParserCreator internal_parser_creator;

    explicit ParallelParsingInputFormat(Params params)
        : IInputFormat(std::move(params.header), params.in)
        , internal_parser_creator(params.internal_parser_creator)
        , file_segmentation_engine(params.file_segmentation_engine)
        , format_name(params.format_name)
        , min_chunk_bytes(params.min_chunk_bytes)
        , is_server(params.is_server)
        , pool(params.max_threads)
    {
        // One unit for each thread, including segmentator and reader, plus a
        // couple more units so that the segmentation thread doesn't spuriously
        // bump into reader thread on wraparound.
        processing_units.resize(params.max_threads   2);

        LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
    }

7 注册完成如何使用

通过调用 这里第一个参数 name 即为 CSV

代码语言:c 复制
InputFormatPtr FormatFactory::getInput(
    const String & name,
    ReadBuffer & buf,
    const Block & sample,
    ContextPtr context,
    UInt64 max_block_size,
    const std::optional<FormatSettings> & _format_settings) const
{

    if (parallel_parsing)
    {
      
        const auto & input_getter = getCreators(name).input_creator;

        RowInputFormatParams row_input_format_params;
        row_input_format_params.max_block_size = max_block_size;
        row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
        row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
        row_input_format_params.max_execution_time = settings.max_execution_time;
        row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;

        /// Const reference is copied to lambda.
        //这里其实做了转换 将 InputCreator —> InternalParserCreator
        auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
            (ReadBuffer & input) -> InputFormatPtr
            {return input_getter(input, sample, row_input_format_params, format_settings); };
        //input_getter 会就会去创建一个std::make_shared<CSVRowInputFormat>
      
      
        //file_segmentation_engine
        ParallelParsingInputFormat::Params params{
            buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
               context->getApplicationType() == Context::ApplicationType::SERVER};
      
      
        return std::make_shared<ParallelParsingInputFormat>(params);
    }
}


const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
{
    auto it = dict.find(name);
    if (dict.end() != it)
        return it->second;
    throw Exception("Unknown format "   name, ErrorCodes::UNKNOWN_FORMAT);
}

为何这里的语法过了呢

InputFormat->InputFormatParserInputFormat->InputFormatParser

8 最终的频繁调用

代码语言:c 复制
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
    if (thread_group)
        CurrentThread::attachToIfDetached(thread_group);

    const auto parser_unit_number = current_ticket_number % processing_units.size();
    auto & unit = processing_units[parser_unit_number];

    try
    {
        setThreadName("ChunkParser");

        /*
         * This is kind of suspicious -- the input_process_creator contract with
         * respect to multithreaded use is not clear, but we hope that it is
         * just a 'normal' factory class that doesn't have any state, and so we
         * can use it from multiple threads simultaneously.
         */
        ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);

        InputFormatPtr input_format = internal_parser_creator(read_buffer);
        input_format->setCurrentUnitNumber(current_ticket_number);
        InternalParser parser(input_format);

        unit.chunk_ext.chunk.clear();
        unit.chunk_ext.block_missing_values.clear();

        /// Propagate column_mapping to other parsers.
        /// Note: column_mapping is used only for *WithNames types
        if (current_ticket_number != 0)
            input_format->setColumnMapping(column_mapping);

        // We don't know how many blocks will be. So we have to read them all
        // until an empty block occurred.
        Chunk chunk;
        while (!parsing_finished && (chunk = parser.getChunk()) != Chunk())
        {
            /// Variable chunk is moved, but it is not really used in the next iteration.
            /// NOLINTNEXTLINE(bugprone-use-after-move)
            unit.chunk_ext.chunk.emplace_back(std::move(chunk));
            unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
        }

0 人点赞