数据分析实战:kafka+clickhouse数据收集

2022-04-20 23:26:04 浏览数 (2)

  • 数据分析实战:kafka clickhouse数据收集
    • 简单实例
      • 1. 创建数据库
      • 2. kafka主题映射表
      • 3. 创建数据表
      • 4. 物化视图监控更改
    • 使用虚拟列
    • 暂停消费

clickhouse是一款强大的数据仓库选择,不需要额外的依赖;兼容SQL,还提供了许多引擎。我们考虑使用,kafka作为分析数据的收集,各个服务节点只要向kafka发送数据,而无需关心数据的落地。

而后,需要用到clickhouse提供的kafka()表引擎,和物化视图进行落地数据。

简单实例

一个例子,包含kafka表,MergeTree数据表,以及物化视图。

1. 创建数据库

需要创建两个库,kafka库用来映射kafka的主题,product库保存实际的数据。

代码语言:sql复制
CREATE DATABASE kafka;;

CREATE DATABASE product;;

2. kafka主题映射表

代码语言:sql复制
DROP TABLE IF EXISTS kafka.item_int;;
CREATE TABLE kafka.item_int (
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
  
) ENGINE = Kafka(
  '172.31.1.1:9092,172.31.1.1:9093,172.31.1.1:9094',
  'item_int',
  'item_int',
  'CSV'
);;

选用kafka引擎,必要传入参数:

  • 第一个参数:kafka集群的地址
  • 第二个参数:消费的主题名
  • 第三个参数:消费组id,如果想多个主题数据顺序,需要设置一样的组id
  • 第四个参数:解析数据的格式,支持CSVJSONEachRow两种格式,默认都是要n结束

3. 创建数据表

代码语言:sql复制
CREATE TABLE product.item_int (
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
  toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;

数据表是实际保存数据的表,kafka表只是一个数据的中转。

4. 物化视图监控更改

代码语言:sql复制
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int  AS
SELECT
  *
FROM
  kafka.item_int;;

使用虚拟列

kafka表提供了三个隐藏的虚拟列:

  • _topic: String, 消费的kafka主题名
  • _offset: UInt64, 消息的偏移量
  • _partition: UInt64, 消息消费的分区

使用也很简单,只需要再数据表中加入这三个字段:

代码语言:sql复制
CREATE TABLE product.item_int (
    `_topic` String,
    `_offset` UInt64,
    `_partition` UInt64,
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
  toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;

然后,修改物化视图即可:

代码语言:sql复制
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
  _topic,_offset,_partition,*
FROM
  kafka.item_int;;

暂停消费

在需要修改表结构时,需要暂停消费,不然会造成数据消费丢失(偏移量变了,而数据为落库)。方法只要detachkafka表就可以了:

代码语言:sql复制
detach table kafka.item_int

重新启动:

代码语言:sql复制
attach table kafka.item_int

0 人点赞