最近笔者在使用Clickhouse的过程中,用到了Optimize Table命令,而在业务开发过程中,由于不了解Optimize Table命令的明确行为,中间出了很多岔子,在查问题的过程中,也发现网上关于Optimize Table命令的介绍资料很少,因此笔者决定结合源码,全面解析下Optimize Table命令。
Optimize Table命令的功能
Clickhouse作为一款OLAP数据库,对数据更新的支持比较弱,而且并不支持标准的SQL update/delete语法;它提供的alter table ...... update/delete 语法也是异步的,即收到命令后先返回给客户端成功,至于什么时候数据真正更新成功是不确定的。
因此在业务需要数据更新的场景下(如Mysql同步到Clickhouse),通常会使用ReplacingMergeTree或CollapsingMergeTree的数据合并逻辑绕行实现异步更新,这样一方面可以保证数据的最终一致性,另一方面Clickhouse性能开销也会比alter table小。但这种方式有一个缺点是MergeTree引擎的数据合并过程(merge)是Clickhouse基于策略控制的,执行时间比较随机,因此数据一致性缺少时间保证,极端情况下数据过了一天也没有完全合并。
而Optimize Table这个命令可以强制触发MergeTree引擎的数据合并,可以用来解决数据合并时间不确定的问题。
Optimize Table执行过程源码解析
Clickhouse在收到一个SQL语句后,会通过如下的流程执行SQL:Parser(解析SQL语法,转化为AST)-> Interpreter(优化生成执行计划 RBO)-> Interpreter::executeImpl(通过Block Stream读取或写入数据)[1]。Optimize Table语句也不例外,只不过Optimize语句没有复杂的执行计划。
Clickhouse收到Optimize Table命令后会调用到ParserOptimizeQuery::parseImpl()解析命令。
代码语言:c 复制bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_optimize_table("OPTIMIZE TABLE");
ParserKeyword s_partition("PARTITION");
ParserKeyword s_final("FINAL");
ParserKeyword s_deduplicate("DEDUPLICATE");
ParserKeyword s_by("BY");
......
}
可以看到Optimize Table语句中主要解析了以下几个关键词:“OPTIMIZE TABLE”、“PARTITION”、“FINAL”、“DEDUPLICATE”、“BY”。官方文档这样介绍这些关键词的作用[2]:
1. “OPTIMIZE TABLE”:指定需要Optimize的表,只支持MergeTree引擎。
2. “PARTITION”:若指定了分区,则只会对指定的分区触发合并任务。
3. “FINAL”:即使只有一个文件块也执行合并,即使有并行的正在执行的合并,也会强制执行这一次合并。
4. “DEDUPLICATE”:去重,若没有后续的“BY”子句,则按照行完全相同去重(所有字段值相同)。
5. “BY”:配合“DEDUPLICATE”关键词使用,指定依据哪些列去重。
接下来对照源码,看看这些关键词如何控制合并执行。
进入InterpreterOptimizeQuery::execute(),先校验了“DEDUPLICATE BY”的列中是否包含了表的分区键、主键,若未包含则直接抛出异常。Clickhouse的数据存储依据分区键划分文件块,每个文件块中的数据按照主键排序,因此在去重时若包含了分区键、主键,Clickhouse可以只对相邻的行进行去重,而不需要另外构造哈希表,可以极大的提升执行效率。
代码语言:c 复制BlockIO InterpreterOptimizeQuery::execute()
{
......
// Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use.
Names column_names;
if (ast.deduplicate_by_columns)
{
......
metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id);
Names required_columns;
{
required_columns = metadata_snapshot->getColumnsRequiredForSortingKey();
const auto partitioning_cols = metadata_snapshot->getColumnsRequiredForPartitionKey();
required_columns.reserve(required_columns.size() partitioning_cols.size());
required_columns.insert(required_columns.end(), partitioning_cols.begin(), partitioning_cols.end());
}
for (const auto & required_col : required_columns)
{
// Deduplication is performed only for adjacent rows in a block,
// and all rows in block are in the sorting key order within a single partition,
// hence deduplication always implicitly takes sorting keys and partition keys in account.
// So we just explicitly state that limitation in order to avoid confusion.
if (std::find(column_names.begin(), column_names.end(), required_col) == column_names.end())
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"DEDUPLICATE BY expression must include all columns used in table's"
" ORDER BY, PRIMARY KEY, or PARTITION BY but '{}' is missing."
" Expanded DEDUPLICATE BY columns expression: ['{}']",
required_col, fmt::join(column_names, "', '"));
}
}
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
return {};
}
校验了去重列之后,就调用了表的optimize()方法。事实上只有MergeTree和ReplicatedMergeTree实现了optimize()方法,其他的存储引擎调用optimize()方法都会直接抛出异常。
进入InterpreterOptimizeQuery::optimize(),在未指定“PARTITION”并且使用了“FINAL”时,会遍历表的所有分区,并对每一个分区执行合并逻辑;如果指定了分区,就不再关注“FINAL”关键词了,都是对该分区执行合并;如果即没有指定分区,也没有使用“FINAL”的情况下,代码中的partition_id就为空,在merge()方法中对这种情况做了特殊的处理。
代码语言:c 复制bool StorageMergeTree::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
ContextPtr local_context)
{
......
String disable_reason;
if (!partition && final)
{
DataPartsVector data_parts = getDataPartsVector();
std::unordered_set<String> partition_ids;
for (const DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
if (!merge(
true,
partition_id,
true,
deduplicate,
deduplicate_by_columns,
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{......}
}
}
else
{
String partition_id;
if (partition)
partition_id = getPartitionIDFromQuery(partition, local_context);
if (!merge(
true,
partition_id,
final,
deduplicate,
deduplicate_by_columns,
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{......}
}
return true;
}
InterpreterOptimizeQuery::merge()方法逻辑很简单,选取要合并的文件块 -> 合并选择的文件块。
代码语言:c 复制bool StorageMergeTree::merge(
bool aggressive,
const String & partition_id,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
String * out_disable_reason,
bool optimize_skip_merged_partitions)
{
......
{
merge_mutate_entry = selectPartsToMerge(
metadata_snapshot,
aggressive,
partition_id,
final,
out_disable_reason,
table_lock_holder,
lock,
optimize_skip_merged_partitions,
&select_decision);
}
......
return mergeSelectedParts(metadata_snapshot, deduplicate, deduplicate_by_columns, *merge_mutate_entry, table_lock_holder);
}
进入StorageMergeTree::selectPartsToMerge(),在partition_id为空时(只有既不指定分区,又不使用“FINAL”时,才会为空),会执行selectPartsToMerge()依据策略选择一些文件块来执行合并,而在partition_id非空时,则是执行selectAllPartsToMergeWithinPartition()将分区下所有的文件块全部合并。所以,在既不指定分区,也不使用“FINAL”关键词的情况下,Optimize Table命令并不能保证数据最终会变为完全合并的状态。
代码语言:c 复制std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot,
bool aggressive,
const String & partition_id,
bool final,
String * out_disable_reason,
TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & lock,
bool optimize_skip_merged_partitions,
SelectPartsDecision * select_decision_out)
{
......
if (partition_id.empty())
{
......
if (max_source_parts_size > 0)
{
select_decision = merger_mutator.selectPartsToMerge(
future_part,
aggressive,
max_source_parts_size,
can_merge,
merge_with_ttl_allowed,
out_disable_reason);
}
else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero";
}
else
{
while (true)
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, metadata_snapshot, out_disable_reason, optimize_skip_merged_partitions);
auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds();
auto timeout = std::chrono::milliseconds(timeout_ms);
/// If final - we will wait for currently processing merges to finish and continue.
if (final
&& select_decision != SelectPartsDecision::SELECTED
&& !currently_merging_mutating_parts.empty()
&& out_disable_reason
&& out_disable_reason->empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout))
{
*out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
break;
}
}
else
break;
}
}
......
}
另外,在使用了“FINAL”关键词的情况下,Optimize Table命令会等待正在执行的合并任务结束,再执行该次合并,所以在指定分区的情况下,使用“FINAL”关键词的响应会慢一些。
InterpreterOptimizeQuery::mergeSelectedParts()的逻辑就比较复杂了,这里就不具体介绍了,但总体逻辑就是把选择的文件块都读出来,然后执行数据合并,形成一个新的文件块写入磁盘。因此在数据量比较大的情况下,这其实是一个很重的操作,因为不论是否真的有数据需要合并,都需要将全量数据读出来,重新写一份到磁盘上。在执行Optimize之后,生成了新的文件块,但是老的文件块并不会立刻消失,而是会异步删除,因此在执行大表的Optimize之后会看到数据存储容量有短暂的上升。
针对有些分区不需要合并的情况,Clickhouse 21.1版本做了一个优化,在系统变量(system.settings表)里增加了optimize_skip_merged_partitions参数,这个参数开启,在selectAllPartsToMergeWithinPartition()中会排除掉只有一个文件块且level>0的分区(这样的分区意味着该分区之前已经合并过)。
实验验证
为了验证上面代码逻辑,笔者在Clickhouse 20.3 版本(没有optimize_skip_merged_partitions参数)上进行了一些实验。
1. Optimize Partition
图二是执行Optimize Table ...... Partition 20210209的执行效果,可以看到执行后20210209这个分区中的2个文件块(Parts)被合并成了一个文件块,其level为3,而其他的分区并没有发生合并。当然图中展示的是Optimize最终的效果,在刚执行完该命令时,原有的20210209_84_94_2、20210209_95_95_0文件夹并不会立刻消失,而是过了几分钟才被删除掉。
2. Optimize Final
图三是Optimize Table ...... Final的执行效果,可以看到经过执行Optimize Final命令之后,20211013这个分区的多个文件块合并成了一个文件块;同时,其他已经合并过的分区(如20210729)会被重新写一份,其level由5变为了7(因为中间执行了2次Optimize Final语句)。
3. Optimize
最后再看下简单Optimize的效果,如图四所示。可以看到Clickhouse只是依据策略选取了某一个分区的某些文件块进行合并(20211013_0_231_28、20211013_232_410_30、20211013_411_432_10三个文件块合并为了20211013_0_432_31文件块),这样并不能保证最终数据被完全合并。
使用总结
在基于Clickhouse的数据仓库建设中,由于Clickhouse本身不支持完备的数据更新,数据的实时性和一致性存在trade-off,如果应用场景对数据一致性要求很高,在有数据更新的情况下,基本无法实时导入数据,只能周期性的离线导入以保证Clickhouse中的数据是某一时刻的完整切片。离线任务由于存在调度延时,一般来讲周期最小只能做到小时级,很难做到分钟级。如果应用场景更在意数据的实时性,就可以采用实时导入的方式,由于Clickhouse的Merge过程是基于策略调度的,因此在数据一致性上就会差一些(会查到本该被删除的数据)。
基于实时写入 定期Optimize的方式,可以通过改变Optimize周期,在性能、数据一致性之间做平衡。当数据一致性要求较高时,可以缩短Optimize周期,极端情况甚至可以每次写入都执行Optimize,这样可以将数据不一致的时间缩短到分钟级(当然这样对Clickhouse的性能要求比较严格);当数据量比较大时,可以半个小时左右执行一次Optimize,这样在保证Clickhouse集群性能的同时,也对数据不一致的时间有一个保障。在笔者的实际使用中,Clickhouse集群使用32核64G机器,单表原始数据量在1TB以内的情况下,Optimize执行周期在5min-10min都没什么压力。
参考文献
[1] ClickHouse 源码阅读 —— 详解查询SQL语句执行过程. https://nowjava.com/article/43828
[2] Clickhouse docs. https://clickhouse.com/docs/en/sql-reference/statements/optimize/