- 数据分析实战:kafka clickhouse数据收集
- 简单实例
- 1. 创建数据库
- 2. kafka主题映射表
- 3. 创建数据表
- 4. 物化视图监控更改
- 使用虚拟列
- 暂停消费
- 简单实例
clickhouse是一款强大的数据仓库选择,不需要额外的依赖;兼容SQL
,还提供了许多引擎。我们考虑使用,kafka作为分析数据的收集,各个服务节点只要向kafka发送数据,而无需关心数据的落地。
而后,需要用到clickhouse提供的kafka()
表引擎,和物化视图进行落地数据。
简单实例
一个例子,包含kafka
表,MergeTree
数据表,以及物化视图。
1. 创建数据库
需要创建两个库,kafka
库用来映射kafka的主题,product
库保存实际的数据。
CREATE DATABASE kafka;;
CREATE DATABASE product;;
2. kafka主题映射表
代码语言:sql复制DROP TABLE IF EXISTS kafka.item_int;;
CREATE TABLE kafka.item_int (
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = Kafka(
'172.31.1.1:9092,172.31.1.1:9093,172.31.1.1:9094',
'item_int',
'item_int',
'CSV'
);;
选用kafka
引擎,必要传入参数:
- 第一个参数:kafka集群的地址
- 第二个参数:消费的主题名
- 第三个参数:消费组id,如果想多个主题数据顺序,需要设置一样的组id
- 第四个参数:解析数据的格式,支持
CSV
和JSONEachRow
两种格式,默认都是要n
结束
3. 创建数据表
代码语言:sql复制CREATE TABLE product.item_int (
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;
数据表是实际保存数据的表,kafka
表只是一个数据的中转。
4. 物化视图监控更改
代码语言:sql复制CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
*
FROM
kafka.item_int;;
使用虚拟列
kafka
表提供了三个隐藏的虚拟列:
_topic
: String, 消费的kafka
主题名_offset
: UInt64, 消息的偏移量_partition
: UInt64, 消息消费的分区
使用也很简单,只需要再数据表中加入这三个字段:
代码语言:sql复制CREATE TABLE product.item_int (
`_topic` String,
`_offset` UInt64,
`_partition` UInt64,
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;
然后,修改物化视图即可:
代码语言:sql复制CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
_topic,_offset,_partition,*
FROM
kafka.item_int;;
暂停消费
在需要修改表结构时,需要暂停消费,不然会造成数据消费丢失(偏移量变了,而数据为落库)。方法只要detach
kafka表就可以了:
detach table kafka.item_int
重新启动:
代码语言:sql复制attach table kafka.item_int