纯钧(ChunJun,原名FlinkX)框架学习

2022-11-13 13:23:19 浏览数 (1)

目录

一、背景

二、概念

三、特性

四、工作原理

五、快速开始

1.数据同步任务模版

kafka to kudu

mysql to hive 

2.数据同步执行命令

flinkx老版本命令参数:

flinkx老版本执行命令: 

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

六、dolphinscheduler集成chunjun


[CSDN话题挑战赛第2期](https://marketing.csdn.net/p/7b6697fd9dd3795a268d1a6f2fe75012) 参赛话题:[大数据学习成长记录](https://activity.csdn.net/creatActivity?id=10214)

一、背景

今天领导突然问dolphinscheduler能不能支持采集埋点数据实时写入kudu,datax是离线etl工具肯定不支持了,只能用flink sql或者FlinkX来实现了。但是FlinkX之前没听说过,新知识点呀,果断学起来!!!

二、概念

纯钧(ChunJun,原名FlinkX),是一款稳定、易用、高效、批流一体的数据集成框架, 是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。 目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。 官方网站:https://dtstack.github.io/chunjun/ git: chunjun: 基于flink的分布式数据同步框架 肖友/flinkx - Gitee.com

三、特性

纯钧(ChunJun)将不同的数据库抽象成了reader/source 插件,writer/sink 插件和lookup 维表插件,其具有以下特点:

  • 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
  • 支持分布式运行,支持flink-standalone、yarn-session、yarn-per job等多种提交方式;
  • 支持Docker一键部署,支持K8S 部署运行;
  • 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
  • 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
  • 不仅仅支持全量同步,还支持增量同步、间隔轮训;
  • 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
  • 支持脏数据存储,并提供指标监控等;
  • 配合checkpoint实现断点续传;
  • 不仅仅支持同步DML数据,还支持Schema变更同步

四、工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:

五、快速开始

1.数据同步任务模版

flinkx使用和datax配置差不多,配置好输入输出的json

kafka to kudu

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "name": "int_field",
                "type": "int"
              },
              {
                "name": "byte_field",
                "type": "byte"
              },
              {
                "name": "short_field",
                "type": "smallint"
              },
              {
                "name": "long_field",
                "type": "bigint"
              },
              {
                "name": "binary_field",
                "type": "binary"
              },
              {
                "name": "string_field",
                "type": "string"
              },
              {
                "name": "bool_field",
                "type": "boolean"
              },
              {
                "name": "float_field",
                "type": "float"
              },
              {
                "name": "double_field",
                "type": "double"
              }
            ],
            "sliceRecordCount": [
              100
            ]
          }
        },
        "writer": {
          "parameter": {
            "kerberos": {
              "keytab": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/hive3.keytab",
              "principal": "hive/eng-cdh3@DTSTACK.COM",
              "krb5Conf": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/krb5.conf"
            },
            "column": [
              {
                "name": "int_field",
                "type": "int32"
              },
              {
                "name": "byte_field",
                "type": "int8"
              },
              {
                "name": "short_field",
                "type": "int16"
              },
              {
                "name": "long_field",
                "type": "int64"
              },
              {
                "name": "binary_field",
                "type": "binary"
              },
              {
                "name": "string_field",
                "type": "string"
              },
              {
                "name": "bool_field",
                "type": "bool"
              },
              {
                "name": "float_field",
                "type": "float"
              },
              {
                "name": "double_field",
                "type": "double"
              }
            ],
            "masters": "eng-cdh1:7051",
            "table": "table_name",
            "flushMode": "manual_flush",
            "writeMode": "append",
            "batchSizeBytes": 1048576
          },
          "name": "kuduwriter"
        }
      }
    ],
    "setting": {
        "speed": {
               "channel": 1,
               "bytes": 0
                 },
        "errorLimit": {
               "record": 10000,
               "percentage": 100
                 },
        "dirty": {
                "path": "/tmp",
                "hadoopConfig": {
                    "fs.default.name": "hdfs://ns1",
                    "dfs.nameservices": "ns1",
                    "dfs.ha.namenodes.ns1": "nn1,nn2",
                    "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
                    "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
                    "dfs.ha.automatic-failover.enabled": "true",
                    "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "fs.hdfs.impl.disable.cache": "true"
                }
            },
        "restore": {
               "isRestore": false,
               "isStream": false
      }
    }
  }
}

mysql to hive

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter" : {
            "username" : "username",
            "password" : "password",
            "cat" : "insert,delete,update",
            "jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
            "host" : "ip",
            "port" : 3308,
            "start" : {
            },
            "table" : [ "tudou.kudu" ],
            "splitUpdate" : false,
            "pavingData" : true
          },
          "name" : "binlogreader"
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
            "username" : "",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "compress" : "",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "analyticalRules" : "test_${schema}_${table}",
            "schema" : "tudou",
            "tablesColumn" : "{"kudu":[{"comment":"","type":"varchar","key":"type"},{"comment":"","type":"varchar","key":"schema"},{"comment":"","type":"varchar","key":"table"},{"comment":"","type":"bigint","key":"ts"},{"part":false,"comment":"","type":"INT","key":"before_id"},{"comment":"","type":"INT","key":"after_id","part":false},{"part":false,"comment":"","type":"VARCHAR","key":"before_name"},{"comment":"","type":"VARCHAR","key":"after_name","part":false},{"part":false,"comment":"","type":"INT","key":"before_age"},{"comment":"","type":"INT","key":"after_age","part":false}]}",
            "partition" : "pt",
            "partitionType" : "MINUTE",
            "defaultFS" : "hdfs://ns",
            "hadoopConfig" : {
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://ns",
              "dfs.namenode.rpc-address.ns.nn2": "ip:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "ip:9000",
              "dfs.nameservices": "ns",
              "fs.hdfs.impl.disable.cache": "true",
              "hadoop.user.name": "root",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

2.数据同步执行命令

flinkx老版本命令参数:

代码语言:javascript复制
model
描述:执行模式,也就是flink集群的工作模式
local: 本地模式
standalone: 独立部署模式的flink集群
yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
必选:否
默认值:local

job
描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
必选:是
默认值:无

pluginRoot
描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
必选:是
默认值:无

flinkconf
描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
必选:否
默认值:无

yarnconf
描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
必选:否
默认值:无

flinkx老版本执行命令: 

代码语言:javascript复制
以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

代码语言:javascript复制
以本地模式启动数据同步任务
进入到chunjun-dist 目录,执行命令
sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json


以standalone模式启动数据同步任务
1. 启动Flink Standalone环境
sh $FLINK_HOME/bin/start-cluster.sh
启动成功后默认端口为8081,我们可以访问当前机器的8081端口进入standalone的flink web ui
2. 提交任务
进入到本地chunjun-dist目录,执行命令
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

以yarn模式启动数据同步任务
1. 启动Yarn Session环境
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME和
FLINK_HOME,我们需要使用yarn-session -t参数上传chunjun-dist
cd $FLINK_HOME/bin
./yarn-session -t $CHUNJUN_HOME -d
2. 提交任务
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {"yarn.application.id":"SESSION_APPLICATION_ID"}

六、dolphinscheduler集成chunjun

dolphinscheduler工具栏集成chunjun,本来不支持的,7天前有位好心的大佬更新了相关代码

0 人点赞