用过 HBase 的同学应该都知道,当批量导入数据的时候,可以利用 Spark 这样的计算引擎,直接将数据生成 HFile 一次性导入到 HBase,既有效地分离了 HBase 的计算压力,又实现了高效的数据导入。
我一直在想,在 ClickHouse 中有没有类似的方法,可以直接利用 Spark 生成 MergeTree 的数据文件,然后一次性导入到目标表。
今天就来秀一秀奇技淫巧。
我在书里面曾经介绍过, ClickHouse 自带了一个叫做 clickhouse-local 的二进制工具,可以简单理解成一个单机版的 ClickHouse。
它有这么几个鲜明的特点:
1. 实现了大部分 ClickHouse Server 的功能 (表引擎、函数、查询等)
2. 不需要依赖 ClickHouse Server ,能够的独立运行
是不是嗅到了什么?
我们来试一下吧,执行下面的语句:
代码语言:javascript复制echo -e "1n2n3" | ./clickhouse-local -S "id Int64" -N "tmp_table" -q "SELECT * FROM tmp_table;"
1
2
3
关于 clickhouse-local 具体的用法我在这就不赘述了,不了解的可以去看书的第三章。
简单来说:
1. -N 创建了名为 tmp_table 的表,默认是 File 引擎
2. -S 是表字段
3. 读取了 echo 流的数据,写入 tmp_table
4. -q 是 SQL 语句,可以看到查询返回了 1 2 3
如果把 echo 替换成文件流或是或者是其他的数据流,是不是可以利用 clickhouse-local 帮我们生成数据文件呢?
思路上可行,但是这里还存在一个问题,我们在刚才的语句上添加 --logger.console 参数再看一次:
代码语言:javascript复制echo -e "1n2n3" | ./clickhouse-local -S "id Int64" -N "tmp_table" -q "SELECT * FROM tmp_table;" --logger.console
加入这个参数后可以显示日志,我们观察一下日志信息:
代码语言:javascript复制 <Debug> Application: Working directory created: /var/folders/n6/cq8_zr4n0hg0nzt9yhzy0b580000gp/T/clickhouse-local-65488-1620309903-3668076629011319556
...
...
<Debug> Application: Removing temporary directory: /var/folders/n6/cq8_zr4n0hg0nzt9yhzy0b580000gp/T/clickhouse-local-65488-1620309903-3668076629011319556
默认情况下,clickhouse-local 每次执行,都会在操作系统临时目录(例如 centos 的 /tmp) 生成一个唯一的 clickhouse-local-xxxx 临时工作空间,当 SQL 执行完后这个目录是会被删除。
悲剧啊。。目录都被删除了,那怎么拿数据文件呢?
莫要担心,我们可以在刚才的命令后面加上一个 --path参数,手动指定工作目录,这样就不用担心删除的问题了。
现在用一个完整的示例,演示整个核心过程:
首先在 ClickHouse Server 里面创建我们模拟的目标表:
代码语言:javascript复制CREATE TABLE test_batch (
id Int64,
EventTime Date
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventTime)
ORDER BY id
写入 1w 条测试数据:
代码语言:javascript复制INSERT INTO TABLE test_batch SELECT number,'2021-04-05' FROM `system`.numbers LIMIT 10000
查看这张表的分区信息,目前只有一个 202104 分区,1w 行数据:
代码语言:javascript复制SELECT
partition,
name,
rows
FROM system.parts
WHERE table = 'test_batch'
Query id: 90bbe5aa-30e8-43e6-91be-8c70869e7a11
┌─partition─┬─name─────────┬──rows─┐
│ 202104 │ 202104_1_1_0 │ 10000 │
└───────────┴──────────────┴───────┘
1 rows in set. Elapsed: 0.003 sec.
现在,把我的笔记本想象成 Spark 环境,直接生成 test_batch 的数据文件:
代码语言:javascript复制% echo -e "1n2n3" | ./clickhouse-local -S "id Int64" -N "tmp_table" -q "CREATE TABLE test_batch (id Int64,EventTime Date) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventTime) ORDER BY id;INSERT INTO TABLE test_batch SELECT id,'2021-05-05' FROM tmp_table;" --logger.console -- --path /clickhouse/ch9-data/local-data
Logging trace to console
2021.05.06 23:15:56.656071 [ 2835492 ] {} <Debug> Access(user directories): Added users.xml access storage 'users.xml', path:
2021.05.06 23:15:56.667644 [ 2835492 ] {} <Debug> Application: Loading metadata from /clickhouse/ch9-data/local-data/
...
...
上面的写法和之前类似:
- 首先利用了名为 tmp_table 的 File 表引擎,保存输入流的数据
- 然后新建了一张 MergeTree
- 通过 INSERT SELECT 将数据从 tmp_table 转到 MergeTree
- 增加了 -- --path 参数,手动指定了保存的目录
观察日志,我们进入到这次查询的工作空间目录:
代码语言:javascript复制% cd /clickhouse/ch9-data/local-data/data/_local/test_batch
% ls -l
total 8
drwxr-xr-x 11 nauu staff 352 5 6 22:20 202105_1_1_0
drwxr-xr-x 2 nauu staff 64 5 6 22:20 detached
-rw-r--r-- 1 nauu staff 1 5 6 22:20 format_version.txt
可以看到, MergeTree 的分区数据文件已经生成好了。
现在手动把 202105_1_1_0 文件夹,拷贝到 ClickHouse Server 所在服务器,/data/default/test_batch/detached 目录下。
然后执行 ATTACH 加载分区:
代码语言:javascript复制ALTER TABLE test_batch ATTACH PART '202105_1_1_0'
Query id: a67bd99e-e147-453d-8e2f-61b1b8a0ea85
0 rows in set. Elapsed: 0.004 sec.
执行之后,再次观察 test_batch 的分区信息:
代码语言:javascript复制SELECT
partition,
name,
rows
FROM system.parts
WHERE table = 'test_batch'
Query id: 92bf6364-c829-4f6b-ad5d-b00b27d7ac8e
┌─partition─┬─name─────────┬──rows─┐
│ 202104 │ 202104_1_1_0 │ 10000 │
│ 202105 │ 202105_2_2_0 │ 3 │
└───────────┴──────────────┴───────┘
2 rows in set. Elapsed: 0.004 sec.
可以看到,数据已经被一次性加载进去了。
需要注意的是,当手动指定 --path 参数之后,如果再次写入,会出现目录已存在的错误:
代码语言:javascript复制Directory for table data data/_local/test_batch/ already exists
所以我们需要添加一些机制确保目录名不会重复,无用的工作目录也需要进行清理。
还有 clickhouse-local 生成的数据文件也需要实现自动的分发。
当前这个方案比较简陋,还有一些需要完善和自动化的地方。但不妨碍作为抱砖引玉,为大家提供一个新的思路。