如何在 ClickHouse 中实现数据更新是一个老生常谈的话题了,众所周知,
Replacing / Collapsing / VersionedCollapsing MergeTree 都能够支持数据更新,但是他们的更新触发时机只能发生在分区合并的时候 (不明白什么意思?请进传送门ClickHouse各种MergeTree的关系与作用),这是一种最终一致性的实现思路,所以在分区合并之前,可能会查询到多余的数据。
那么应该如何实现准实时的更新呢?一种常见的做法是在数据写入之后,按分区的粒度执行 OPTIMIZE FINAL 命令,刷新最近时间的分区。
今天我想从另一个角度,谈谈在 ClickHouse 中实现准实时更新的奇技婬巧。
这种思路主要使用 ReplacingMergeTree,因为它相比 CollapsingMergeTree 更加简单。CollapsingMergeTree 对数据的要求比较严格,不仅需要反位标记,而且需要保证正负标记号的个数对应。
首先准备一张 ReplacingMergeTree 测试表:
代码语言:javascript复制CREATE TABLE test_a(
user_id UInt64,
score String,
deleted UInt8 DEFAULT 0,
create_time DateTime DEFAULT toDateTime(0)
)ENGINE= ReplacingMergeTree(create_time)
ORDER BY user_id
其中:
user_id 是数据去重更新的标识;
create_time 是版本号字段,每组数据中 create_time 最大的一行表示最新的数据;
deleted 是自定的一个标记位,比如 0 代表未删除,1 代表删除数据。
首先写入 1000万 测试数据:
代码语言:javascript复制INSERT INTO TABLE test_a(user_id,score)
WITH(
SELECT ['A','B','C','D','E','F','G']
)AS dict
SELECT number AS user_id, dict[number%7 1] FROM numbers(10000000)
接着修改前 50万 行数据,修改内容包括 name 字段和 create_time 版本号字段:
代码语言:javascript复制INSERT INTO TABLE test_a(user_id,score,create_time)
WITH(
SELECT ['AA','BB','CC','DD','EE','FF','GG']
)AS dict
SELECT number AS user_id, dict[number%7 1], now() AS create_time FROM numbers(500000)
如果现在 COUNT 一下总数,由于还未触发分区合并,所以会发现有 50 万的重复数据:
代码语言:javascript复制ch7.nauu.com :) SELECT COUNT() FROM test_a;
SELECT COUNT()
FROM test_a
┌──COUNT()─┐
│ 10500000 │
└──────────┘
1 rows in set. Elapsed: 0.006 sec.
现在为查询语句加上 FINAL 关键字:
代码语言:javascript复制SELECT COUNT()
FROM test_a
FINAL
┌──COUNT()─┐
│ 10000000 │
└──────────┘
1 rows in set. Elapsed: 0.603 sec. Processed 10.50 million rows, 136.50 MB (17.42 million rows/s., 226.48 MB/s.)
这次的查询结果到是正确了,看看执行日志:
代码语言:javascript复制Expression
Expression
Aggregating
Concat
Expression
SourceFromInputStream
会发现,FINAL 查询通过 Concat 动作将查询流变成了串行过程,所以不适合 "重型" 查询。
那么除了使用 FINAL 之外,还有什么办法呢?现在轮到 argMax 函数登场了。
argMax 函数的参数如下所示,它能够按照 field2 的最大值取 field1 的值。
代码语言:javascript复制argMax(field1,field2)
看到这里,你应该能够想到它的用处了吧。
当我们更新数据时,会写入一行新的数据,通过查询最大的 create_time 得到修改后的字段值,例如通过下面的语句可以得到最新的 score :
代码语言:javascript复制argMax(score, create_time) AS score
通过下面的语句,就能查询到最新的数据:
代码语言:javascript复制SELECT
user_id ,
argMax(score, create_time) AS score,
argMax(deleted, create_time) AS deleted,
max(create_time) AS ctime
FROM test_a
GROUP BY user_id
HAVING deleted = 0
现在为它建立一个视图:
代码语言:javascript复制CREATE VIEW view_test_a AS
SELECT
user_id ,
argMax(score, create_time) AS score,
argMax(deleted, create_time) AS deleted,
max(create_time) AS ctime
FROM test_a
GROUP BY user_id
HAVING deleted = 0
现在尝试修改 id 为 1 的数据:
代码语言:javascript复制INSERT INTO TABLE test_a(user_id,score,create_time) VALUES(0,'AAAA',now())
查询验证:
代码语言:javascript复制ch7.nauu.com :) SELECT * FROM view_test_a WHERE user_id = 0;
SELECT *
FROM view_test_a
WHERE user_id = 0
┌─user_id─┬─score─┬─deleted─┬───────────────ctime─┐
│ 0 │ AAAA │ 0 │ 2020-06-08 04:40:09 │
└─────────┴───────┴─────────┴─────────────────────┘
1 rows in set. Elapsed: 0.006 sec. Processed 16.39 thousand rows, 385.05 KB (2.65 million rows/s., 62.38 MB/s.)
可以发现数据被修改了,是不是有种在使用 OLTP 数据库的幻觉
现在删除这条数据,将 deleted 写成 1:
代码语言:javascript复制INSERT INTO TABLE test_a(user_id,score,deleted,create_time) VALUES(0,'AAAA',1,now())
再次查询,这条数据就看不到了:
代码语言:javascript复制ch7.nauu.com :) SELECT * FROM view_test_a WHERE user_id = 0;
SELECT *
FROM view_test_a
WHERE user_id = 0
Ok.
0 rows in set. Elapsed: 0.009 sec. Processed 16.39 thousand rows, 385.08 KB (1.76 million rows/s., 41.35 MB/s.)
当然,这行数据并没有被真正的删除,而是被过滤掉了。在一些合适的场景下,可以结合 表级别的 TTL 最终将物理数据删除。