一、Flink简介与数据同步需求
1、Flink简介
Flink是新型的计算框架,具有分布式、低延迟、高吞吐和高可靠的特性。其支持多种部署方式:local(单机)、standalone模式,也可以基于yarn,mesos或者k8s做资源调度。Flink提供了比较高级的API,我们能比较方便地扩展现有的API来满足一些特殊需求,此外Flink提供了完整的状态管理体系(checkpoint),可以基于这个机制实现断点续传。
2、数据同步需求
这里主要说明的是离线数据同步。实时数据同步相对而言没有周期性的资源调度问题,原生的Flink框架结合其丰富的connector即可满足大部分需求。
1)支持多种部署模式
开发测试时可以单机部署,在生产环境支持分布式部署。
2)分布式资源调度能力
如果可以基于yarn,mesos或者k8s等做资源调度,可以极大提高资源利用率、提升运营效率!
3)支持断点续传
在大数据量的传输场景下,当网络出现抖动DB抖动等情况时任务可能会失败。那这个时候重跑任务耗时耗力(血与泪)。此时就需要能从失败的点继续跑,也就是断点续传。
3、常见异构数据同步工具对比
Flinkx是袋鼠云开源的一款基于Flink的分布式数据同步工具(框架)。
工具 | flinkx | datax | logkit | sqoop |
---|---|---|---|---|
架构 | 分布式 | 单机(分布式版本未开源) | 单机 | 分布式 |
同步速度控制 | 支持 | 支持 | 支持 | 不支持 |
脏数据管理 | 支持 | 支持 | 不支持 | 不支持 |
插件化 | 支持 | 支持 | 支持 | 不支持 |
断点续传 | 支持 | 不支持 | 不支持 | 不支持 |
配置方式 | json | json | web界面 | 脚本 |
经过对比不难发现:少数据量、简单数据源下,阿里开源的datax等已经可以满足需求。但在我们复杂环境下Flinkx凭借其分布式架构与断点续传特点,具有明显优势,比较符合我们的需求。
二、Flinkx框架实现与原理
1、实现概览
FlinkX采用了一种插件式的架构来实现多种异构数据源之间的数据同步:
- 不同的源数据库被抽象成不同的Reader插件;
- 不同的目标数据库被抽象成不同的Writer插件;
理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
2、任务自动组装
Template模块根据同步任务的配置信息加载源数据库和目的数据库对应的Reader插件和Writer插件;
- Reader插件实现了InputFormat接口,从源数据库中获取DataStream对象;
- Writer插件实现了OutputFormat接口,将目的数据库与DataStream对象相关联;
Template模块通过DataStream对象将Reader和Writer串接在一起,组装成一个Flink任务,并提交到Flink集群上执行。
3、并发同步实现分析
这里以MySQL为例:
1)配置文件
配置文件中mysql reader部分如下:
代码语言:txt复制"reader": {
"name": "mysqlreader",
"parameter": {
"username": "user",
"password": "password",
"connection": [
{
"table": [
"data_test"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true"
]
}
],
"splitPk": "id"
}
}
配置全文见:
https://github.com/DTStack/flinkx/blob/1.8.2/docs/rdbreader.md
2)并发读取分析
实际读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板如下:
代码语言:txt复制select * from data_test
where id mod ${channel_num}=${channel_index}
and id > ${offset}
在没有出现下一节所说的断点续传问题时,多个通道对应的任务即为:
代码语言:txt复制-- 通道一
select * from data_test
where id mod 2=0;
-- 通道二
select * from data_test
where id mod 2=1;
通过对splitPk字段取模,就可以生成多个SQL并发地从db里面拉取数据,实现并发读取。
4、断点续传
1) 概念解读
断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可(类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行),可以大大节省时间和计算资源。
2) Flink checkpoint机制
Flinkx框架的断点续传是基于Flink的checkpoint机制实现,所以我们首先了解一下Flink的checkpoint机制实现:
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。
当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
3) MySQL同步时的断点续传
checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:
- Reader_0:id=12
- Reader_1:id=11
任务如果异常结束恢复后,任务会给把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:
代码语言:txt复制-- 第一个通道
select * from data_test
where id mod 2=0
and id > 12;
-- 第二个通道
select * from data_test
where id mod 2=1
and id > 11;
这样就可以从上一次失败的位置继续读取数据了。
4) 断点续传不是万能的
- 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段。同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据;
- 数据源必须支持数据过滤。如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复;
- 目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持。
三、开发一个Flinkx插件
这里我们以改造后的stream writer插件为例(框架自带的stream插件不支持自定义分隔符和batch写入)
1、基础功能实现
1) 命名约束
flinkx框架本身对命名有一些约束:
- module命名:
<module>flinkx-stream</module>
- package命名:com.dtstack.flinkx.stream.writercom.dtstack.flinkx.classloader.PluginUtil getPluginClassName 限制
- class命名:├── StreamOutputFormat.java ├── StreamOutputFormatBuilder.java └── StreamWriter.java当命名不符合框架的约束时,会出现找不到对应class的异常。
2) 配置获取
首先StreamOutputFormatBuilder需要定义相关的set函数,如:
代码语言:txt复制public void setWriteDelimiter(String writeDelimiter) {
format.writeDelimiter = writeDelimiter;
}
然后StreamWriter获取json里的配置,并由builder实例调用set函数,如:
代码语言:txt复制writeDelimiter = config.getJob().getContent().get(0).getWriter().getParameter().getStringVal("writeDelimiter", "|");
代码语言:txt复制builder.setWriteDelimiter(writeDelimiter);
3) 构建数据输出
StreamOutputFormat继承自RichOutputFormat,主要需要自定义以下逻辑:
- openInternal:进行一些初始化的处理逻辑
- writeSingleRecordInternal:定义单条数据处理逻辑
- writeMultipleRecordsInternal:定义批量数据处理逻辑(见下一小节)
- closeInternal:进行一些关闭的处理逻辑
4) row处理
writeSingleRecordInternal数据处理的对象是row,row是flink原生的结构org.apache.flink.types.Row,本质上是一个Arrays,主要使用如下方法:
- getArity:row的长度
- getField:获取指定位置的值
- setField:修改指定位置的值
- toString:将数组内的值以","分隔转成String
2、batch功能实现
在一些场景下,我们还需要进行batch处理以提升传输的性能,此时我们开发插件时需要启动writeMultipleRecordsInternal部分。
1) setBatchInterval
builder实例需要setBatchInterval
代码语言:txt复制builder.setBatchInterval(batchInterval);
当batchInterval>1,才会调用writeMultipleRecordsInternal
2) rows处理
rows是前文提到的ArrayList<Row>
结构,进行简单遍历即可处理。
注意:不同数据源batchInterval设置值是不同,并且有些数据源不支持batch操作。
四、Flinkx on yarn部署
1、环境依赖
我们需要一个Flink on yarn的集群:
- flink:1.8.x
- hadoop:2.8.5
在实际测试中,我们hadoop版本过低时会出现各种异常,建议使用 2.8.5版本。
2、部署细则
- node manager 所有节点需要拷贝Flink相关文件到yarn客户机相同目录,包括:bin、lib、plugins
- 启动一个session
yarnPer模式下可以不用手动启动session
- 提交flinkx任务bin/flinkx -mode yarn -job mysql_2_xx.json -pluginRoot /data/home/xx/flinkx/plugins -flinkconf /data/clusterserver/flink-1.8.1/conf/ -yarnconf /data/clusterserver/hadoop/etc/hadoop/ -confProp '{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/tmp/checkpoint_dir"}' -s /tmp/checkpoint_dir/xx/chk-*
- 去flink ui和各自数据系统上验收相关数据了。
五、其他
1、补充
- Flinkx目前官方文档较少,大部分时候需要阅读其源码才能解决问题。
- 小数据场景下,Flinkx优势不是很大,毕竟集群启动任务调度等均需要时间。不同场景下的详细分析报告,敬请期待!
2、参考
flink官网:https://flink.apache.org/
flinkx官网:https://github.com/DTStack/flinkx
logkit:https://github.com/qiniu/logkit/blob/master/READMECN.md