为使用ClickHouse 消费Kafka 实时数据的同学提供一些参考
一 架构流程图:
可以看到ClickHouse 内置Kafka 消费引擎,不需要我们业务方写新的消费程序,再往ClickHouse 导入数据
二 前提条件:
- 已创建Kafka集群,且在生产数据
- 已创建云数据库 CDW-ClickHouse集群
三 使用限制:
Kafka集群和ClickHouse集群需要在同一VPC下。
四 操作步骤:
这里忽略Kafka 集群本身的一些操作,以上三个步骤是可以调整顺序的
- Kafka Table Engine: 在ClickHouse 内部创建Kafka消费表(这里可以理解为 消费了一部分Kafka 表的一个队列,存储消费Kafka Topic的一部分数据)
- MergeTree Table Engine: 在ClickHouse 内部创建 Kafka 数据存储表
- MATERIALIZED Table: 在ClickHouse 内部创建 Kafka 消费表,这里可以理解为它是一个搬运者,将 Kafka Table Engine 挪到 MergeTree Table Engine
五 操作步骤详解:
1 Kafka Table Engine
代码语言:javascript复制CREATE TABLE IF NOT EXISTS data_sync.test_queue(
name String,
age int,
gongzhonghao String,
my_time DateTime64(3, 'UTC')
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.16.16.4:9092',
kafka_topic_list = 'lemonCode',
kafka_group_name = 'lemonNan',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = 'n',
kafka_schema = '',
kafka_num_consumers = 1
名称 | 是否必选 | 说明 |
---|---|---|
kafka_broker_list | 是 | Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。 |
kafka_topic_list | 是 | Kafka topic,多个 topic 用逗号分隔。 |
kafka_group_name | 是 | Kafka 的消费组名称。 |
kafka_format | 是 | Kafka 数据格式, ClickHouse 支持的 Format, 详见 文档 可选参数。 |
kafka_row_delimiter | 否 | 行分隔符,用于分割不同的数据行。默认为“n”,您也可以根据数据写入的实际分割格式进行设置。 |
kafka_num_consumers | 否 | 单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。 |
kafka_max_block_size | 否 | Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。 |
kafka_skip_broken_messages | 否 | 表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。 |
kafka_commit_every_batch | 否 | 执行 Kafka commit 的频率,取值如下: 0:完全写入一整个Block数据块的数据后才执行commit; 1:每写完一个Batch批次的数据就执行一次commit。 |
kafka_auto_offset_reset | 否 | 从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。 |
2 MergeTree Table Engine
代码语言:javascript复制CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
3 MATERIABLIZED Table Engine
代码语言:javascript复制CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
六 如何维护
1 停止消费Kafka 数据
代码语言:javascript复制DETACH TABLE cppla.kafka_readings_view ;
2 恢复消费Kafka 数据
代码语言:javascript复制ATTACH TABLE cppla.kafka_readings_view ;
七 分布式写入
1 写入ClickHouse 分布式表
2 Kafka Engine 消费不同分区
八 数据高可用方案
1 ClickHouse ReplicateMergeTree 内部机制保证:
2 ClickHouse 双写保证
九 更新
采用 MergeTree MATERIALIZED AggregatingMergeTree
1 MergeTree
代码语言:javascript复制CREATE TABLE base
(
`i` Int64,
`s` String,
`v` DateTime64,
`id` Int64 DEFAULT 0,
`tag` String DEFAULT '',
`level` Int32 DEFAULT 0
)
ENGINE = MergeTree
PARTITION BY (i % 256)
ORDER BY (i, s)
2 Aggregating MergeTree
代码语言:javascript复制CREATE TABLE updating
(
`i` Int64,
`s` String,
`version` AggregateFunction(max, DateTime64),
`id` AggregateFunction(argMaxIf, Int64, DateTime64, UInt8),
`tag` AggregateFunction(argMaxIf, String, DateTime64, UInt8),
`level` AggregateFunction(argMaxIf, Int32, DateTime64, UInt8)
)
ENGINE = AggregatingMergeTree
PARTITION BY (i % 256)
ORDER BY (i, s)
3 MATERIALIZED
代码语言:javascript复制CREATE MATERIALIZED VIEW mv TO updating AS
SELECT
i,
s,
maxState(v) AS version,
argMaxIfState(id, v, id != 0) AS id,
argMaxIfState(tag, v, tag != '') AS tag,
argMaxIfState(level, v, level != 0) AS level
FROM base
GROUP BY (i, s)
4 查询
代码语言:javascript复制SELECT
i,
s,
maxMerge(version),
argMaxIfMerge(id),
argMaxIfMerge(tag),
argMaxIfMerge(level)
FROM updating
GROUP BY
i,
s
希望对阅读的您有所帮助