本文主要讲解 ClickHouse 对 Lambda 表达式 使用学习
Cpp reference
https://en.cppreference.com/w/cpp/language/lambda
ClickHouse Lambda
1 定义 结构定义 FormatFactory
代码语言:c 复制using InputFormatPtr = std::shared_ptr<IInputFormat>;
代码语言:c 复制 //Lambda 表达式, 类型定义
using InputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
//数据结构定义
struct Creators
{
//类型变量定义
InputCreator input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
};
//结构MAP 丰富
using FormatsDictionary = std::unordered_map<String, Creators>;
//成员变量
FormatsDictionary dict;
//提供注册函数
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
2 FormatFactory 注册借口实现 dict 成员变量注册
代码语言:c 复制void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
auto & target = dict[name].input_creator;
if (target)
throw Exception("FormatFactory: Input format " name " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(input_creator);
registerFileExtension(name, name);
}
3 调用注册接口 CSVRowInputFormat.h
代码语言:c 复制void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWithNamesAndTypesFunc register_func)
{
//去掉用 register_func
register_func(base_format_name, false, false);
register_func(base_format_name "WithNames", true, false);
register_func(base_format_name "WithNamesAndTypes", true, true);
}
void registerInputFormatCSV(FormatFactory & factory)
{
//lamda 注册初始化函数 内部注册
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
{
factory.registerInputFormat(format_name, [with_names, with_types](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
//这里极为 lamda 表达式 最终使用 == InputCreator
return std::make_shared<CSVRowInputFormat>(sample, buf, std::move(params), with_names, with_types, settings);
});
};
//这里是 CSV 传递参
registerWithNamesAndTypes("CSV", register_func);
}
4 CSVRowInputFormat 定义
代码语言:c 复制class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
{
public:
/** with_names - in the first line the header with column names
* with_types - on the next line header with type names
*/
CSVRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_);
5 registerInputFomat factory 参数哪里来的
代码语言:c 复制FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;
return ret;
}
- main 函数调用
代码语言:c 复制int Server::main(const std::vector<std::string> & /*args*/)
{
Poco::Logger * log = &logger();
UseSSL use_ssl;
MainThreadStatus::getInstance();
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerFormats();
registerRemoteFileMetadatas();
- 调用 registreInputFomat
6 Lambda 变形 ParallelParsingInputFormat.h
代码语言:c 复制 using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;
struct Params
{
ReadBuffer & in;
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
String format_name;
size_t max_threads;
size_t min_chunk_bytes;
bool is_server;
};
//成员变量
const InternalParserCreator internal_parser_creator;
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, format_name(params.format_name)
, min_chunk_bytes(params.min_chunk_bytes)
, is_server(params.is_server)
, pool(params.max_threads)
{
// One unit for each thread, including segmentator and reader, plus a
// couple more units so that the segmentation thread doesn't spuriously
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads 2);
LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
}
7 注册完成如何使用
通过调用 这里第一个参数 name 即为 CSV
代码语言:c 复制InputFormatPtr FormatFactory::getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const
{
if (parallel_parsing)
{
const auto & input_getter = getCreators(name).input_creator;
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
/// Const reference is copied to lambda.
//这里其实做了转换 将 InputCreator —> InternalParserCreator
auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
(ReadBuffer & input) -> InputFormatPtr
{return input_getter(input, sample, row_input_format_params, format_settings); };
//input_getter 会就会去创建一个std::make_shared<CSVRowInputFormat>
//file_segmentation_engine
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
context->getApplicationType() == Context::ApplicationType::SERVER};
return std::make_shared<ParallelParsingInputFormat>(params);
}
}
const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
{
auto it = dict.find(name);
if (dict.end() != it)
return it->second;
throw Exception("Unknown format " name, ErrorCodes::UNKNOWN_FORMAT);
}
为何这里的语法过了呢
8 最终的频繁调用
代码语言:c 复制void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
const auto parser_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[parser_unit_number];
try
{
setThreadName("ChunkParser");
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
* just a 'normal' factory class that doesn't have any state, and so we
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
InputFormatPtr input_format = internal_parser_creator(read_buffer);
input_format->setCurrentUnitNumber(current_ticket_number);
InternalParser parser(input_format);
unit.chunk_ext.chunk.clear();
unit.chunk_ext.block_missing_values.clear();
/// Propagate column_mapping to other parsers.
/// Note: column_mapping is used only for *WithNames types
if (current_ticket_number != 0)
input_format->setColumnMapping(column_mapping);
// We don't know how many blocks will be. So we have to read them all
// until an empty block occurred.
Chunk chunk;
while (!parsing_finished && (chunk = parser.getChunk()) != Chunk())
{
/// Variable chunk is moved, but it is not really used in the next iteration.
/// NOLINTNEXTLINE(bugprone-use-after-move)
unit.chunk_ext.chunk.emplace_back(std::move(chunk));
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
}