前言
项目地址
https://gitee.com/dtstack_dev_0/chunjun
下载地址
https://github.com/DTStack/chunjun/releases
解压
代码语言:javascript复制mkdir -p /data/tools/bigdata/taier/chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C /data/tools/bigdata/taier/chunjun
配置环境变量
创建配置文件
代码语言:javascript复制vi /etc/profile.d/chunjun.sh
加入:
代码语言:javascript复制export CHUNJUN_HOME=/data/tools/bigdata/taier/chunjun
export PATH=$CHUNJUN_HOME/bin:$PATH
配置立即生效
代码语言:javascript复制source /etc/profile
查看ZK_HOME
代码语言:javascript复制echo $CHUNJUN_HOME
提交任务
Local
本地提交
Local 模式不依赖Flink环境和Hadoop环境,在本地环境启动一个JVM进程执行纯钧任务。
代码语言:javascript复制sh $CHUNJUN_HOME/bin/chunjun-local.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json
Standalone
Standalone模式依赖Flink Standalone环境,不依赖Hadoop环境。
将依赖文件复制到Flink lib目录下,例如
代码语言:javascript复制cp -r chunjun-dist $FLINK_HOME/lib
注意: 这个复制操作需要在所有Flink cluster机器上执行,否则部分任务会出现类找不到的错误。
启动Flink Standalone环境
代码语言:javascript复制sh $FLINK_HOME/bin/start-cluster.sh
运行
代码语言:javascript复制sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
Yarn Session
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME
和FLINK_HOME
我们需要使用yarn-session -t参数上传chunjun-dist
代码语言:javascript复制$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d
提交任务
Yarn监控页面查询:
http://192.168.7.102:8088/cluster
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID
,进入到本地chunjun-dist目录,执行命令
yarn.application.id
也可以在 flink-conf.yaml 中设置;提交成功之后,可以通过 yarn web ui 上观察任务情况。
成功后会打印
代码语言:javascript复制JobManager Web Interface: http://hadoop01:45685
2022-11-14 14:17:52,433 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1667981758965_0017
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1667981758965_0017
Note that killing Flink might not clean up all job artifacts and temporary files.
我们可以看到
- 网页查看地址:http://hadoop01:45685 可以内网中用http://192.168.7.101:45685
yarn.application.id
为application_1667981758965_0017
- 杀掉命令:
yarn application -kill application_1667981758965_0017
运行
代码语言:javascript复制bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json -confProp {"yarn.application.id":"application_1667981758965_0017"}
如果有正在运行的可以这样杀掉
代码语言:javascript复制yarn application -kill application_1667981758965_0017
Yarn Per-Job
Yarn Per-Job 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME
和FLINK_HOME
。
提交步骤
Yarn Per-Job 提交任务配置正确即可提交。
进入本地chunjun-dist目录,执行命令提交任务。
代码语言:javascript复制bash $CHUNJUN_HOME/bin/chunjun-yarn-perjob.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json
提交成功之后,可以通过 yarn web ui 上观察任务情况
Yarn-Per-Job和Yarn-Session对比
Flink on Yarn-Per Job
Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之后资源就会被释放。
在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解了,具体如下:
- 首先 Client 提交 Yarn App,比如 JobGraph 或者 JARs。
- 接下来 Yarn 的 ResourceManager 会申请第一个 Container。这个 Container 通过 Application Master 启动进程,Application Master 里面运行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
- 最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资源。当分配到资源后,启动 TaskManager。 TaskManager 启动后向 Flink-Yarn ResourceManager 进行注册,注册成功后 JobManager 就会分配具体的任务给 TaskManager 开始执行。
Flink on Yarn-Session
在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。
而 Session 模式则不一样,它的 Dispatcher 和 ResourceManager 是可以复用的。
Session 模式下,当 Dispatcher 在收到请求之后,会启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会启动 JobManager(B) 和对应的 TaskManager 的运行。
当 A、B 任务运行完成后,资源并不会释放。
Session 模式也称为多线程模式,其特点是资源会一直存在不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN ResourceManager。
应用场景
Session 模式和 Per Job 模式的应用场景不一样。
Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。
Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。
配置
mysql to hive
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"parameter" : {
"username" : "username",
"password" : "password",
"cat" : "insert,delete,update",
"jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
"host" : "ip",
"port" : 3308,
"start" : {
},
"table" : [ "tudou.kudu" ],
"splitUpdate" : false,
"pavingData" : true
},
"name" : "binlogreader"
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
"username" : "",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"compress" : "",
"charsetName" : "UTF-8",
"maxFileSize" : 1073741824,
"analyticalRules" : "test_${schema}_${table}",
"schema" : "tudou",
"tablesColumn" : "{"kudu":[{"comment":"","type":"varchar","key":"type"},{"comment":"","type":"varchar","key":"schema"},{"comment":"","type":"varchar","key":"table"},{"comment":"","type":"bigint","key":"ts"},{"part":false,"comment":"","type":"INT","key":"before_id"},{"comment":"","type":"INT","key":"after_id","part":false},{"part":false,"comment":"","type":"VARCHAR","key":"before_name"},{"comment":"","type":"VARCHAR","key":"after_name","part":false},{"part":false,"comment":"","type":"INT","key":"before_age"},{"comment":"","type":"INT","key":"after_age","part":false}]}",
"partition" : "pt",
"partitionType" : "MINUTE",
"defaultFS" : "hdfs://ns",
"hadoopConfig" : {
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://ns",
"dfs.namenode.rpc-address.ns.nn2": "ip:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns.nn1": "ip:9000",
"dfs.nameservices": "ns",
"fs.hdfs.impl.disable.cache": "true",
"hadoop.user.name": "root",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}