1、背景
大数据平台的采集功能是从外部数据源采集数据存储到hive,采集方式分为全量采集、增量采集,增量采集适用于数据规模较大情况,有很多使用场景,但是在增量采集时,平台只能感知数据新增、更新,无法感知到数据删除,为了解决这个问题,本文选用了常用的外部数据源mysql为例进行binlog采集方案介绍。
2、方案
针对mysql数据源,可以通过拉取binlog来回放每条SQL语句,这样不管是新增、更新、删除都能进行处理,但是前提条件是需要mysql服务开启binlog,并且模式为row,因为row模式会存储每条记录的数据变化,能保证数据一致性。大数据平台针对mysql的处理方案流程如图1所示。
方案整体流程主要是通过记录binlog水位,从水位处拉取binlog数据并提取出对应的SQL,然后将SQL应用到存量hive数据上,首次采集时因为水位不存在会通过select进行全量采集。
3、select采集
首次采集时没有binlog水位,会通过select进行全量采集。这里会涉及到一个问题,就是记录新的binlog水位和全量select之间会有一个先后顺序,这两个步骤中间可能会产生新的binlog,比如:1、先记录新binlog水位,再读取数据,会导致下次读取binlog水位时会有重复的SQL操作;2、先读取数据,再记录binlog水位,会导致漏掉部分SQL操作,并且读取数据时间越长,漏掉的概率越大。因为漏数据是无法容忍的,因此平台选择1,为了避免重复的SQL操作,平台增加了约束:采集的mysql表需要包含主键或唯一键,这个约束正常情况下都是完全可以满足的。当mysql表包含主键或唯一键后,即便出现重复SQL操作也不会有问题,比如重复的新增、更新操作在写入hive表时会先根据主键或唯一键删除旧数据,然后使用新数据替换,重复的删除操作相当于删除一个不存在的数据,没有任何效果。因此包含主键或唯一键情况时工作正常。
4、binlog采集
Binlog采集涉及到几个主要步骤:解析binlog、生成重放记录、更新hive表。
4.1 解析binlog
读取到binlog数据后,需要根据操作类型分为INSERT、UPDATE、DELETE,然后将操作类型、主键值或唯一键值写入本地文件raw,文件格式为:
代码语言:javascript复制INSERT,{uid},...
UPDATE,{uid},...
DELETE,{uid},...
其中uid为主键或唯一键算出的md5值,例如某次采集过程中raw文件:
代码语言:javascript复制INSERT,2383c7d07bce3c82e6da7741782de416,"20001","name1",11
INSERT,66df243d406353d0e9db6c5dd027d2d6,"20002","name2",12
UPDATE,9cdf26568d166bc6793ef8da5afa0846,"10007","name777",777
UPDATE,9103c8c82514f39d8360c7430c4ee557,"10002","name222",222
DELETE,6eb887126d24e8f1cd8ad5033482c781,"10005","name5",15
DELETE,d89f3a35931c386956c1a402a8e09941,"10001","name1",10
DELETE,670eca4ad5de0e0cfcc60ab3dd008095,"10008","name8",22
4.2 生成重放记录
上一步的raw文件内容是按照binlog的先后顺序生成的,实际重放时需要按照binlog逆序并过滤掉无效记录。
首先将raw文件逆序内容写入到文件reverse中,reverse文件格式与raw一样,例如某次采集过程中的reverse文件:
代码语言:javascript复制DELETE,670eca4ad5de0e0cfcc60ab3dd008095,"10008","name8",22
DELETE,d89f3a35931c386956c1a402a8e09941,"10001","name1",10
DELETE,6eb887126d24e8f1cd8ad5033482c781,"10005","name5",15
UPDATE,9103c8c82514f39d8360c7430c4ee557,"10002","name222",222
UPDATE,9cdf26568d166bc6793ef8da5afa0846,"10007","name777",777
INSERT,66df243d406353d0e9db6c5dd027d2d6,"20002","name2",12
INSERT,2383c7d07bce3c82e6da7741782de416,"20001","name1",11
获取reverse文件后,依次处理每条记录,过滤掉其中的无效记录,将有效记录存储到文件,因为insert、update操作都可以理解为用新数据替换旧数据,所以将这两个操作的有效记录合并写入到upsert文件,将delete操作的有效记录写入到delete文件。
什么是无效记录?比如当先后两次对同一条记录执行过update操作,实际上只需要保留后一条,前一条记录相当于无效记录。那如何过滤无效记录?因为reverse文件中的记录都是按照binlog逆序,可以在遍历每条数据时根据主键或唯一键记录遇到的操作类型,用来判断后续数据有效性,处理方式为:
- 遇到insert:记录操作,假如之前遇到过同记录update、delete操作,该insert记录不写入upsert文件,否则写入upsert文件。
- 遇到update:记录操作,假如之前遇到过同记录update、delete操作,该update记录不写入upsert文件,否则写入upsert文件。
- 遇到delete:记录操作,假如之前遇到过同记录insert操作,该delete记录不写入delete文件,否则写入delete文件。
4.3 更新hive表
由于hive表不支持delete、update操作,更新hive表时需要通过其他等价方式进行操作。上一步生成的upsert、delete文件分别包含了需要更新的数据和需要删除的数据,可以将这两个文件映射为hive表,然后和存量hive表进行join操作可以得到更新后的hive表。
假设原表为:origin,更新数据表为:incr_upsert,删除数据表为:incr_delete,处理步骤依次为:
- 过滤原表中未删除的数据
origin_remain = origin left join incr_delete on {主键或唯一键} where incr_delete.{主键或唯一键} is null
- 过滤原表中未更新的数据
origin_unchange = origin_remain left join incr_upsert on {主键或唯一键} where incr_upsert.{主键或唯一键} is null
- 合并更新数据
origin_new = origin_unchange union incr_upsert
5、实际效果
Mysql初始数据
初始采集后hive数据
Mysql更新后数据
Binlog采集后hive数据