ClickHouse准实时数据更新的新思路

2020-06-15 11:36:26 浏览数 (1)

如何在 ClickHouse 中实现数据更新是一个老生常谈的话题了,众所周知,

Replacing / Collapsing / VersionedCollapsing MergeTree 都能够支持数据更新,但是他们的更新触发时机只能发生在分区合并的时候 (不明白什么意思?请进传送门ClickHouse各种MergeTree的关系与作用),这是一种最终一致性的实现思路,所以在分区合并之前,可能会查询到多余的数据。

那么应该如何实现准实时的更新呢?一种常见的做法是在数据写入之后,按分区的粒度执行 OPTIMIZE FINAL 命令,刷新最近时间的分区。

今天我想从另一个角度,谈谈在 ClickHouse 中实现准实时更新的奇技婬巧。

这种思路主要使用 ReplacingMergeTree,因为它相比 CollapsingMergeTree 更加简单。CollapsingMergeTree 对数据的要求比较严格,不仅需要反位标记,而且需要保证正负标记号的个数对应。

首先准备一张 ReplacingMergeTree 测试表:

代码语言:javascript复制
CREATE TABLE test_a(
  user_id UInt64,
  score String,
  deleted UInt8 DEFAULT 0,
  create_time DateTime DEFAULT toDateTime(0)
)ENGINE= ReplacingMergeTree(create_time)
ORDER BY user_id

其中:

user_id 是数据去重更新的标识;

create_time 是版本号字段,每组数据中 create_time 最大的一行表示最新的数据;

deleted 是自定的一个标记位,比如 0 代表未删除,1 代表删除数据。

首先写入 1000万 测试数据:

代码语言:javascript复制
INSERT INTO TABLE test_a(user_id,score)
WITH(
  SELECT ['A','B','C','D','E','F','G']
)AS dict
SELECT number AS user_id, dict[number%7 1] FROM numbers(10000000)

接着修改前 50万 行数据,修改内容包括 name 字段和 create_time 版本号字段:

代码语言:javascript复制
INSERT INTO TABLE test_a(user_id,score,create_time)
WITH(
  SELECT ['AA','BB','CC','DD','EE','FF','GG']
)AS dict
SELECT number AS user_id, dict[number%7 1], now() AS create_time FROM numbers(500000)

如果现在 COUNT 一下总数,由于还未触发分区合并,所以会发现有 50 万的重复数据:

代码语言:javascript复制
ch7.nauu.com :) SELECT COUNT() FROM test_a;

SELECT COUNT()
FROM test_a

┌──COUNT()─┐
│ 10500000 │
└──────────┘

1 rows in set. Elapsed: 0.006 sec.

现在为查询语句加上 FINAL 关键字:

代码语言:javascript复制
SELECT COUNT()
FROM test_a
FINAL

┌──COUNT()─┐
│ 10000000 │
└──────────┘

1 rows in set. Elapsed: 0.603 sec. Processed 10.50 million rows, 136.50 MB (17.42 million rows/s., 226.48 MB/s.)

这次的查询结果到是正确了,看看执行日志:

代码语言:javascript复制
Expression
 Expression
  Aggregating
   Concat
    Expression
     SourceFromInputStream

会发现,FINAL 查询通过 Concat 动作将查询流变成了串行过程,所以不适合 "重型" 查询。

那么除了使用 FINAL 之外,还有什么办法呢?现在轮到 argMax 函数登场了。

argMax 函数的参数如下所示,它能够按照 field2 的最大值取 field1 的值。

代码语言:javascript复制
argMax(field1,field2)

看到这里,你应该能够想到它的用处了吧。

当我们更新数据时,会写入一行新的数据,通过查询最大的 create_time 得到修改后的字段值,例如通过下面的语句可以得到最新的 score :

代码语言:javascript复制
argMax(score, create_time) AS score

通过下面的语句,就能查询到最新的数据:

代码语言:javascript复制
SELECT
  user_id ,
  argMax(score, create_time) AS score, 
  argMax(deleted, create_time) AS deleted,
  max(create_time) AS ctime 
FROM test_a 
GROUP BY user_id
HAVING deleted = 0

现在为它建立一个视图:

代码语言:javascript复制
CREATE VIEW view_test_a AS
SELECT
  user_id ,
  argMax(score, create_time) AS score, 
  argMax(deleted, create_time) AS deleted,
  max(create_time) AS ctime 
FROM test_a 
GROUP BY user_id
HAVING deleted = 0

现在尝试修改 id 为 1 的数据:

代码语言:javascript复制
INSERT INTO TABLE test_a(user_id,score,create_time) VALUES(0,'AAAA',now())

查询验证:

代码语言:javascript复制
ch7.nauu.com :) SELECT * FROM view_test_a WHERE user_id = 0;

SELECT *
FROM view_test_a
WHERE user_id = 0

┌─user_id─┬─score─┬─deleted─┬───────────────ctime─┐
│       0 │ AAAA  │       0 │ 2020-06-08 04:40:09 │
└─────────┴───────┴─────────┴─────────────────────┘

1 rows in set. Elapsed: 0.006 sec. Processed 16.39 thousand rows, 385.05 KB (2.65 million rows/s., 62.38 MB/s.)

可以发现数据被修改了,是不是有种在使用 OLTP 数据库的幻觉

现在删除这条数据,将 deleted 写成 1:

代码语言:javascript复制
INSERT INTO TABLE test_a(user_id,score,deleted,create_time) VALUES(0,'AAAA',1,now())

再次查询,这条数据就看不到了:

代码语言:javascript复制
ch7.nauu.com :) SELECT * FROM view_test_a WHERE user_id = 0;

SELECT *
FROM view_test_a
WHERE user_id = 0

Ok.

0 rows in set. Elapsed: 0.009 sec. Processed 16.39 thousand rows, 385.08 KB (1.76 million rows/s., 41.35 MB/s.)

当然,这行数据并没有被真正的删除,而是被过滤掉了。在一些合适的场景下,可以结合 表级别的 TTL 最终将物理数据删除。

0 人点赞