数据知行合一
知:掌握数据建设方法论、技术体系;
行:将数据建设方法论、技术体系与业务场景结合落地
关注“数据万有引力”公众号
正文共:3198字 11图 | 预计阅读时间:8分钟
承接上个专题 clickhosue准实时数仓能力探索 留下问题“上游实时数据怎么sink到clickhouse?”,在这里一起探索 CDC ChangeLog Stream实时流sink 到CLICKHOUSE最佳姿势。
在进行技术选型、方案设计与实操之前,先简单概述下数据库变更日志是怎么流入click house的:
CDC技术通过实时捕捉数据变更日志作为流计算引擎(如flink,spark)
数据源,这些实时流数据源ChangeLog Stream由包含变更操作列(用于插入、删除、更新(先前)、更新(新)标识)的行和实际的元数据列组成,流入flink引擎。flink再将ChangeLog Stream转换为Dynamic Table的Append或Retract或Upsert模式,然后再sink到外部系统,如:clickhouse
这里涉及到几个术语解释:
- Dynamic Table & ChangeLog Stream
- Append-only stream
- Retract stream
- Upsert stream
术语解释
01. Dynamic Table & ChangeLog Stream
Dynamic table在flink中是一个逻辑概念,。下图是ChangeLog Stream和dynamic table转换关系,先将ChangeLog Stream转化为dynamic table,再基于dynamic table进行SQL操作生成新的dynamic table。
- Dynamic Table 就是 Flink SQL 定义的动态表,动态表和流的概念是对等的。参照上图,流可以转换成动态表,动态表也可以转换成流。
- 在 Flink SQL中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的形式,任意时刻的 Changelog Stream 可以翻译为一个表,也可以翻译为一个流。
上游CDC技术,实时捕捉数据库变更日志,flink实时消费日志,数据库中的变更日志作为flink流的数据源(Changelog Stream),如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源
在将Changelog Stream转换为Dynamic Table或将其写入外部系统时,Flink 根据数据变化类型提供三种结果的输出模式。
02. Append-only stream
Append-only stream:A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows. flink-docs-release-1.15
Append-only 是最为简单的输出模式,只支持追加结果记录的操作。结果一旦输出以后便不会再有变更,Append 输出模式的最大特性是不可变性(immutability)
通常来说,Append 模式会用于写入不方便做撤回或者删除操作的存储系统的场景,比如 Kafka 等 MQ 或者打印到控制台。
03. Retract stream
Retract stream: A retract stream is a stream with two types of messages, add and retract messages. A dynamic table is converted into a retract stream by encoding an INSERT change as add message, a DELETE change as a retract message, and an UPDATE change as a retract message for the updated (previous) row, and an additional message for the updating (new) row. flink-docs-release-1.15
retract 流包含两种类型的 message:add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。
如上图,在mysql执行update操作
代码语言:javascript复制update inventory.`debezium_products` set weight=180 where id=101;
ChangeLog转为Retract stream会在dynamic table写入以下数据
op | id | name | description | weight |
---|---|---|---|---|
-U | 101 | scooter | description | 80.000 |
U | 101 | scooter | description | 180.000 |
04. Upsert stream
Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. flink-docs-release-1.15
upsert 流包含两种类型的 message:upsert messages 和_delete messages_。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message 。
如上图,在mysql执行update操作
代码语言:javascript复制update inventory.`debezium_products` set weight=180 where id=101;
ChangeLog转为Retract stream会在dynamic table写入以下数据
op | id | name | description | weight |
---|---|---|---|---|
-D | 101 | scooter | description | 80.000 |
I | 101 | scooter | description | 180.000 |
ChangeLog Stream 写入clickhosue方案
由于clickhosue以下特性,ChangeLog Stream 写入clickhosue需要相应解决方案
- clickhosue不适合大量单条数据的写请求,因为写入过快时后台合并不过来,会报Too many parts等错误
- clickhosue不适合高频繁的数据更新和删除操作,因为变更数据的聚合处理需要时间,短期内可能出现数据不准的现象,同时clickhosue对事务支持也不够完善。
通过以下方案,解决ChangeLog Stream 写入clickhosue存在以上局限问题
- 使用CollapsingMergeTree表承接Retract stream与Upsert stream写入。
- 使用VersionedCollapsingMergeTree表承接Versioned Table 的Retract stream写入。
- 使用ReplacingMergeTree表承接Append-only stream或特定场景(没有物理删除)Retract stream写入。
- 某些数据量小,变更不频繁维表的Upsert stream可以使用other engines of this family (*MergeTree)表承接写入
- 按数据批次大小以及批次间隔两个条件控制写入频率,在part merge压力和数据实时性两方面取得平衡。
基于以上解决方案,flink-connector-clickhouse设计如下图 ,
扫下面二维码或搜一搜“数据万有引力”关注公众号获取 “flink-connector-clickhouse.jar”,私信获取源码
- 首先考虑到ClickHouse擅长大批量写入的特点,通过batch option 可以支持攒批写入,避免频繁写入造成的性能下降问题;同时通过batchTime option兼顾数据实时性;两个option,只要其中一个满足条件就触发sink,从而在part merge压力和数据实时性两方面取得平衡。
- 其次是ChangeLog Stream包含大量的更新和删除操作。为了支持频繁变更的数据,将Flink的Retract Stream(回撤流)、Upsert Stream(更新-插入流)含有状态标记的数据流,写入到ClickHouse的 CollapsingMergeTree引擎表中。
- 然后是Versioned Table(dynamic table with a PRIMARY KEY constraint and time attribute),通过将event time生成version,根据状态标记生成sign,再将数据流写入ClickHouse的VersionedCollapsingMergeTree引擎表中。
CDC技术选型
在flink cdc connector与flink Debezium Format对CDC技术进行选型,通过上图架构与对比
- flink cdc connector需要维护组件更少,实时链路更简单,部署成本低;全量阶段支持数据并发读取,并且支持全量阶段checkpoint;可以不需要对库或表加锁来保证数据一致性。
- Debezium的使用人数多,社区活跃,框架也比较成熟,技术更稳定;在保证数据一致性时,需要对读取的库或表加锁;全量阶段读取阶段,只支持单并发。
虽然flink cdc 有很多亮点能力,不过项目还在孵化阶段,有些操作不是很丝滑;如果有功力深厚的技术架构团队来驾驭它(陪社区一起成长,拥抱社区并与之合作),flink cdc 可以覆盖业务场景会更深。
如果业务场景对稳定要求比较高,同时又不想投入高成本驾驭技术,其实Debezium已经可以覆盖很多场景了。
可以将Debezium作为Flink的嵌入式引擎,作为一个依赖包嵌入到代码库,而不用通过kafka connector运行,同样也可以不再需要直接与 MySQL 服务器通信,不需要处理复杂快照、GTID、锁等等优点。同时简化
全过程解决方案
根据上面探索,最终CDC ChangeLog Stream实时流sink 到CLICKHOUSE全过程解决方案如上图
- flink cdc connector实时捕捉数据变更日志,实现数据全量与实时增量采集。
- 自研flink-connector-clickhouse实现不同ChangeLog Stream模式(appendretractupsert)输出到相应的clickhosue表引擎。
- 在clickhouse使用相应表引擎承接上游数据输入。