ClickHouse For Kafka

2022-06-17 11:58:18 浏览数 (2)

为使用ClickHouse 消费Kafka 实时数据的同学提供一些参考

一 架构流程图:

可以看到ClickHouse 内置Kafka 消费引擎,不需要我们业务方写新的消费程序,再往ClickHouse 导入数据

二 前提条件:

  • 已创建Kafka集群,且在生产数据
  • 已创建云数据库 CDW-ClickHouse集群

三 使用限制:

Kafka集群和ClickHouse集群需要在同一VPC下。

四 操作步骤:

这里忽略Kafka 集群本身的一些操作,以上三个步骤是可以调整顺序的

  1. Kafka Table Engine: 在ClickHouse 内部创建Kafka消费表(这里可以理解为 消费了一部分Kafka 表的一个队列,存储消费Kafka Topic的一部分数据)
  2. MergeTree Table Engine: 在ClickHouse 内部创建 Kafka 数据存储表
  3. MATERIALIZED Table: 在ClickHouse 内部创建 Kafka 消费表,这里可以理解为它是一个搬运者,将 Kafka Table Engine 挪到 MergeTree Table Engine

五 操作步骤详解:

1 Kafka Table Engine
代码语言:javascript复制
CREATE TABLE IF NOT EXISTS data_sync.test_queue(
  name String,
  age int,
  gongzhonghao String,
  my_time DateTime64(3, 'UTC')
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.16.16.4:9092',
kafka_topic_list = 'lemonCode',
kafka_group_name = 'lemonNan',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = 'n',
kafka_schema = '',
kafka_num_consumers = 1

名称

是否必选

说明

kafka_broker_list

Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。

kafka_topic_list

Kafka topic,多个 topic 用逗号分隔。

kafka_group_name

Kafka 的消费组名称。

kafka_format

Kafka 数据格式, ClickHouse 支持的 Format, 详见 文档 可选参数。

kafka_row_delimiter

行分隔符,用于分割不同的数据行。默认为“n”,您也可以根据数据写入的实际分割格式进行设置。

kafka_num_consumers

单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。

kafka_max_block_size

Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。

kafka_skip_broken_messages

表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。

kafka_commit_every_batch

执行 Kafka commit 的频率,取值如下: 0:完全写入一整个Block数据块的数据后才执行commit; 1:每写完一个Batch批次的数据就执行一次commit。

kafka_auto_offset_reset

从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。

2 MergeTree Table Engine
代码语言:javascript复制
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
3 MATERIABLIZED Table Engine
代码语言:javascript复制
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;

六 如何维护

1 停止消费Kafka 数据
代码语言:javascript复制
DETACH TABLE cppla.kafka_readings_view ;
2 恢复消费Kafka 数据
代码语言:javascript复制
ATTACH TABLE cppla.kafka_readings_view ;

七 分布式写入

1 写入ClickHouse 分布式表

2 Kafka Engine 消费不同分区

八 数据高可用方案

1 ClickHouse ReplicateMergeTree 内部机制保证:

2 ClickHouse 双写保证

九 更新

采用 MergeTree MATERIALIZED AggregatingMergeTree

1 MergeTree
代码语言:javascript复制
CREATE TABLE base
(
 `i` Int64,
 `s` String,
 `v` DateTime64,
 `id` Int64 DEFAULT 0,
 `tag` String DEFAULT '',
 `level` Int32 DEFAULT 0
)
ENGINE = MergeTree 
PARTITION BY (i % 256)
ORDER BY (i, s)
2 Aggregating MergeTree
代码语言:javascript复制
CREATE TABLE updating
(
 `i` Int64,
 `s` String,
 `version` AggregateFunction(max, DateTime64),
 `id` AggregateFunction(argMaxIf, Int64, DateTime64, UInt8),
 `tag` AggregateFunction(argMaxIf, String, DateTime64, UInt8),
 `level` AggregateFunction(argMaxIf, Int32, DateTime64, UInt8)
)
ENGINE = AggregatingMergeTree
PARTITION BY (i % 256)
ORDER BY (i, s)
3 MATERIALIZED
代码语言:javascript复制
CREATE MATERIALIZED VIEW mv TO updating AS
SELECT
 i,
 s,
 maxState(v) AS version,
 argMaxIfState(id, v, id != 0) AS id,
 argMaxIfState(tag, v, tag != '') AS tag,
 argMaxIfState(level, v, level != 0) AS level
FROM base
GROUP BY (i, s)
4 查询
代码语言:javascript复制
SELECT
 i,
 s,
 maxMerge(version),
 argMaxIfMerge(id),
 argMaxIfMerge(tag),
 argMaxIfMerge(level)
FROM updating
GROUP BY
 i,
 s

希望对阅读的您有所帮助

0 人点赞