目录
一、背景
二、概念
三、特性
四、工作原理
五、快速开始
1.数据同步任务模版
kafka to kudu
mysql to hive
2.数据同步执行命令
flinkx老版本命令参数:
flinkx老版本执行命令:
chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)
六、dolphinscheduler集成chunjun
[CSDN话题挑战赛第2期](https://marketing.csdn.net/p/7b6697fd9dd3795a268d1a6f2fe75012) 参赛话题:[大数据学习成长记录](https://activity.csdn.net/creatActivity?id=10214)
一、背景
今天领导突然问dolphinscheduler能不能支持采集埋点数据实时写入kudu,datax是离线etl工具肯定不支持了,只能用flink sql或者FlinkX来实现了。但是FlinkX之前没听说过,新知识点呀,果断学起来!!!
二、概念
纯钧(ChunJun,原名FlinkX),是一款稳定、易用、高效、批流一体的数据集成框架, 是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。 目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。 官方网站:https://dtstack.github.io/chunjun/ git: chunjun: 基于flink的分布式数据同步框架 肖友/flinkx - Gitee.com
三、特性
纯钧(ChunJun)将不同的数据库抽象成了reader/source 插件,writer/sink 插件和lookup 维表插件,其具有以下特点:
- 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
- 支持分布式运行,支持flink-standalone、yarn-session、yarn-per job等多种提交方式;
- 支持Docker一键部署,支持K8S 部署运行;
- 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
- 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
- 不仅仅支持全量同步,还支持增量同步、间隔轮训;
- 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
- 支持脏数据存储,并提供指标监控等;
- 配合checkpoint实现断点续传;
- 不仅仅支持同步DML数据,还支持Schema变更同步
四、工作原理
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
五、快速开始
1.数据同步任务模版
flinkx使用和datax配置差不多,配置好输入输出的json
kafka to kudu
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"name": "int_field",
"type": "int"
},
{
"name": "byte_field",
"type": "byte"
},
{
"name": "short_field",
"type": "smallint"
},
{
"name": "long_field",
"type": "bigint"
},
{
"name": "binary_field",
"type": "binary"
},
{
"name": "string_field",
"type": "string"
},
{
"name": "bool_field",
"type": "boolean"
},
{
"name": "float_field",
"type": "float"
},
{
"name": "double_field",
"type": "double"
}
],
"sliceRecordCount": [
100
]
}
},
"writer": {
"parameter": {
"kerberos": {
"keytab": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/hive3.keytab",
"principal": "hive/eng-cdh3@DTSTACK.COM",
"krb5Conf": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/krb5.conf"
},
"column": [
{
"name": "int_field",
"type": "int32"
},
{
"name": "byte_field",
"type": "int8"
},
{
"name": "short_field",
"type": "int16"
},
{
"name": "long_field",
"type": "int64"
},
{
"name": "binary_field",
"type": "binary"
},
{
"name": "string_field",
"type": "string"
},
{
"name": "bool_field",
"type": "bool"
},
{
"name": "float_field",
"type": "float"
},
{
"name": "double_field",
"type": "double"
}
],
"masters": "eng-cdh1:7051",
"table": "table_name",
"flushMode": "manual_flush",
"writeMode": "append",
"batchSizeBytes": 1048576
},
"name": "kuduwriter"
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
},
"errorLimit": {
"record": 10000,
"percentage": 100
},
"dirty": {
"path": "/tmp",
"hadoopConfig": {
"fs.default.name": "hdfs://ns1",
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
"dfs.ha.automatic-failover.enabled": "true",
"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"fs.hdfs.impl.disable.cache": "true"
}
},
"restore": {
"isRestore": false,
"isStream": false
}
}
}
}
mysql to hive
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"parameter" : {
"username" : "username",
"password" : "password",
"cat" : "insert,delete,update",
"jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
"host" : "ip",
"port" : 3308,
"start" : {
},
"table" : [ "tudou.kudu" ],
"splitUpdate" : false,
"pavingData" : true
},
"name" : "binlogreader"
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
"username" : "",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"compress" : "",
"charsetName" : "UTF-8",
"maxFileSize" : 1073741824,
"analyticalRules" : "test_${schema}_${table}",
"schema" : "tudou",
"tablesColumn" : "{"kudu":[{"comment":"","type":"varchar","key":"type"},{"comment":"","type":"varchar","key":"schema"},{"comment":"","type":"varchar","key":"table"},{"comment":"","type":"bigint","key":"ts"},{"part":false,"comment":"","type":"INT","key":"before_id"},{"comment":"","type":"INT","key":"after_id","part":false},{"part":false,"comment":"","type":"VARCHAR","key":"before_name"},{"comment":"","type":"VARCHAR","key":"after_name","part":false},{"part":false,"comment":"","type":"INT","key":"before_age"},{"comment":"","type":"INT","key":"after_age","part":false}]}",
"partition" : "pt",
"partitionType" : "MINUTE",
"defaultFS" : "hdfs://ns",
"hadoopConfig" : {
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://ns",
"dfs.namenode.rpc-address.ns.nn2": "ip:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns.nn1": "ip:9000",
"dfs.nameservices": "ns",
"fs.hdfs.impl.disable.cache": "true",
"hadoop.user.name": "root",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2.数据同步执行命令
flinkx老版本命令参数:
代码语言:javascript复制model
描述:执行模式,也就是flink集群的工作模式
local: 本地模式
standalone: 独立部署模式的flink集群
yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
必选:否
默认值:local
job
描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
必选:是
默认值:无
pluginRoot
描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
必选:是
默认值:无
flinkconf
描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
必选:否
默认值:无
yarnconf
描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
必选:否
默认值:无
flinkx老版本执行命令:
代码语言:javascript复制以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)
代码语言:javascript复制以本地模式启动数据同步任务
进入到chunjun-dist 目录,执行命令
sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
以standalone模式启动数据同步任务
1. 启动Flink Standalone环境
sh $FLINK_HOME/bin/start-cluster.sh
启动成功后默认端口为8081,我们可以访问当前机器的8081端口进入standalone的flink web ui
2. 提交任务
进入到本地chunjun-dist目录,执行命令
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
以yarn模式启动数据同步任务
1. 启动Yarn Session环境
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME和
FLINK_HOME,我们需要使用yarn-session -t参数上传chunjun-dist
cd $FLINK_HOME/bin
./yarn-session -t $CHUNJUN_HOME -d
2. 提交任务
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {"yarn.application.id":"SESSION_APPLICATION_ID"}
六、dolphinscheduler集成chunjun
dolphinscheduler工具栏集成chunjun,本来不支持的,7天前有位好心的大佬更新了相关代码