大数据平台之binlog采集方案

2021-11-23 13:22:43 浏览数 (2)

1、背景

大数据平台的采集功能是从外部数据源采集数据存储到hive,采集方式分为全量采集、增量采集,增量采集适用于数据规模较大情况,有很多使用场景,但是在增量采集时,平台只能感知数据新增、更新,无法感知到数据删除,为了解决这个问题,本文选用了常用的外部数据源mysql为例进行binlog采集方案介绍。

2、方案

针对mysql数据源,可以通过拉取binlog来回放每条SQL语句,这样不管是新增、更新、删除都能进行处理,但是前提条件是需要mysql服务开启binlog,并且模式为row,因为row模式会存储每条记录的数据变化,能保证数据一致性。大数据平台针对mysql的处理方案流程如图1所示。

图1 采集流程图1 采集流程

方案整体流程主要是通过记录binlog水位,从水位处拉取binlog数据并提取出对应的SQL,然后将SQL应用到存量hive数据上,首次采集时因为水位不存在会通过select进行全量采集。

3、select采集

首次采集时没有binlog水位,会通过select进行全量采集。这里会涉及到一个问题,就是记录新的binlog水位和全量select之间会有一个先后顺序,这两个步骤中间可能会产生新的binlog,比如:1、先记录新binlog水位,再读取数据,会导致下次读取binlog水位时会有重复的SQL操作;2、先读取数据,再记录binlog水位,会导致漏掉部分SQL操作,并且读取数据时间越长,漏掉的概率越大。因为漏数据是无法容忍的,因此平台选择1,为了避免重复的SQL操作,平台增加了约束:采集的mysql表需要包含主键或唯一键,这个约束正常情况下都是完全可以满足的。当mysql表包含主键或唯一键后,即便出现重复SQL操作也不会有问题,比如重复的新增、更新操作在写入hive表时会先根据主键或唯一键删除旧数据,然后使用新数据替换,重复的删除操作相当于删除一个不存在的数据,没有任何效果。因此包含主键或唯一键情况时工作正常。

4、binlog采集

Binlog采集涉及到几个主要步骤:解析binlog、生成重放记录、更新hive表。

4.1 解析binlog

读取到binlog数据后,需要根据操作类型分为INSERT、UPDATE、DELETE,然后将操作类型、主键值或唯一键值写入本地文件raw,文件格式为:

代码语言:javascript复制
INSERT,{uid},...
UPDATE,{uid},...
DELETE,{uid},...

其中uid为主键或唯一键算出的md5值,例如某次采集过程中raw文件:

代码语言:javascript复制
INSERT,2383c7d07bce3c82e6da7741782de416,"20001","name1",11
INSERT,66df243d406353d0e9db6c5dd027d2d6,"20002","name2",12
UPDATE,9cdf26568d166bc6793ef8da5afa0846,"10007","name777",777
UPDATE,9103c8c82514f39d8360c7430c4ee557,"10002","name222",222
DELETE,6eb887126d24e8f1cd8ad5033482c781,"10005","name5",15
DELETE,d89f3a35931c386956c1a402a8e09941,"10001","name1",10
DELETE,670eca4ad5de0e0cfcc60ab3dd008095,"10008","name8",22

4.2 生成重放记录

上一步的raw文件内容是按照binlog的先后顺序生成的,实际重放时需要按照binlog逆序并过滤掉无效记录。

首先将raw文件逆序内容写入到文件reverse中,reverse文件格式与raw一样,例如某次采集过程中的reverse文件:

代码语言:javascript复制
DELETE,670eca4ad5de0e0cfcc60ab3dd008095,"10008","name8",22
DELETE,d89f3a35931c386956c1a402a8e09941,"10001","name1",10
DELETE,6eb887126d24e8f1cd8ad5033482c781,"10005","name5",15
UPDATE,9103c8c82514f39d8360c7430c4ee557,"10002","name222",222
UPDATE,9cdf26568d166bc6793ef8da5afa0846,"10007","name777",777
INSERT,66df243d406353d0e9db6c5dd027d2d6,"20002","name2",12
INSERT,2383c7d07bce3c82e6da7741782de416,"20001","name1",11

获取reverse文件后,依次处理每条记录,过滤掉其中的无效记录,将有效记录存储到文件,因为insert、update操作都可以理解为用新数据替换旧数据,所以将这两个操作的有效记录合并写入到upsert文件,将delete操作的有效记录写入到delete文件。

什么是无效记录?比如当先后两次对同一条记录执行过update操作,实际上只需要保留后一条,前一条记录相当于无效记录。那如何过滤无效记录?因为reverse文件中的记录都是按照binlog逆序,可以在遍历每条数据时根据主键或唯一键记录遇到的操作类型,用来判断后续数据有效性,处理方式为:

  • 遇到insert:记录操作,假如之前遇到过同记录update、delete操作,该insert记录不写入upsert文件,否则写入upsert文件。
  • 遇到update:记录操作,假如之前遇到过同记录update、delete操作,该update记录不写入upsert文件,否则写入upsert文件。
  • 遇到delete:记录操作,假如之前遇到过同记录insert操作,该delete记录不写入delete文件,否则写入delete文件。

4.3 更新hive表

由于hive表不支持delete、update操作,更新hive表时需要通过其他等价方式进行操作。上一步生成的upsert、delete文件分别包含了需要更新的数据和需要删除的数据,可以将这两个文件映射为hive表,然后和存量hive表进行join操作可以得到更新后的hive表。

假设原表为:origin,更新数据表为:incr_upsert,删除数据表为:incr_delete,处理步骤依次为:

  • 过滤原表中未删除的数据
代码语言:javascript复制
origin_remain = origin left join incr_delete on {主键或唯一键} where incr_delete.{主键或唯一键} is null
  • 过滤原表中未更新的数据
代码语言:javascript复制
origin_unchange = origin_remain left join incr_upsert on {主键或唯一键} where incr_upsert.{主键或唯一键} is null
  • 合并更新数据
代码语言:javascript复制
origin_new = origin_unchange union incr_upsert

5、实际效果

Mysql初始数据

初始采集后hive数据

Mysql更新后数据

Binlog采集后hive数据

0 人点赞