大数据开发之ChunJun使用

2022-11-22 16:27:04 浏览数 (1)

前言

项目地址

https://gitee.com/dtstack_dev_0/chunjun

下载地址

https://github.com/DTStack/chunjun/releases

解压

代码语言:javascript复制
mkdir -p /data/tools/bigdata/taier/chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C /data/tools/bigdata/taier/chunjun

配置环境变量

创建配置文件

代码语言:javascript复制
vi /etc/profile.d/chunjun.sh

加入:

代码语言:javascript复制
export CHUNJUN_HOME=/data/tools/bigdata/taier/chunjun 
export PATH=$CHUNJUN_HOME/bin:$PATH

配置立即生效

代码语言:javascript复制
source /etc/profile

查看ZK_HOME

代码语言:javascript复制
echo $CHUNJUN_HOME

提交任务

Local

本地提交

Local 模式不依赖Flink环境和Hadoop环境,在本地环境启动一个JVM进程执行纯钧任务。

代码语言:javascript复制
sh $CHUNJUN_HOME/bin/chunjun-local.sh  -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json

Standalone

Standalone模式依赖Flink Standalone环境,不依赖Hadoop环境。

将依赖文件复制到Flink lib目录下,例如

代码语言:javascript复制
cp -r chunjun-dist $FLINK_HOME/lib

注意: 这个复制操作需要在所有Flink cluster机器上执行,否则部分任务会出现类找不到的错误。

启动Flink Standalone环境

代码语言:javascript复制
sh $FLINK_HOME/bin/start-cluster.sh

运行

代码语言:javascript复制
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

Yarn Session

Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

我们需要使用yarn-session -t参数上传chunjun-dist

代码语言:javascript复制
$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d

提交任务

Yarn监控页面查询:

http://192.168.7.102:8088/cluster

通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令

yarn.application.id 也可以在 flink-conf.yaml 中设置;提交成功之后,可以通过 yarn web ui 上观察任务情况。

成功后会打印

代码语言:javascript复制
JobManager Web Interface: http://hadoop01:45685
2022-11-14 14:17:52,433 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1667981758965_0017
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1667981758965_0017
Note that killing Flink might not clean up all job artifacts and temporary files.

我们可以看到

  • 网页查看地址:http://hadoop01:45685 可以内网中用http://192.168.7.101:45685
  • yarn.application.idapplication_1667981758965_0017
  • 杀掉命令:yarn application -kill application_1667981758965_0017

运行

代码语言:javascript复制
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json -confProp {"yarn.application.id":"application_1667981758965_0017"}

如果有正在运行的可以这样杀掉

代码语言:javascript复制
yarn application -kill application_1667981758965_0017

Yarn Per-Job

Yarn Per-Job 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

提交步骤

Yarn Per-Job 提交任务配置正确即可提交。

进入本地chunjun-dist目录,执行命令提交任务。

代码语言:javascript复制
bash $CHUNJUN_HOME/bin/chunjun-yarn-perjob.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json

提交成功之后,可以通过 yarn web ui 上观察任务情况

Yarn-Per-Job和Yarn-Session对比

Flink on Yarn-Per Job

Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之后资源就会被释放。

在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解了,具体如下:

  • 首先 Client 提交 Yarn App,比如 JobGraph 或者 JARs。
  • 接下来 Yarn 的 ResourceManager 会申请第一个 Container。这个 Container 通过 Application Master 启动进程,Application Master 里面运行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
  • 最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资源。当分配到资源后,启动 TaskManager。 TaskManager 启动后向 Flink-Yarn ResourceManager 进行注册,注册成功后 JobManager 就会分配具体的任务给 TaskManager 开始执行。

Flink on Yarn-Session

在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。

而 Session 模式则不一样,它的 Dispatcher 和 ResourceManager 是可以复用的。

Session 模式下,当 Dispatcher 在收到请求之后,会启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会启动 JobManager(B) 和对应的 TaskManager 的运行。

当 A、B 任务运行完成后,资源并不会释放。

Session 模式也称为多线程模式,其特点是资源会一直存在不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN ResourceManager。

应用场景

Session 模式和 Per Job 模式的应用场景不一样。

Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。

Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。

配置

mysql to hive

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter" : {
            "username" : "username",
            "password" : "password",
            "cat" : "insert,delete,update",
            "jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
            "host" : "ip",
            "port" : 3308,
            "start" : {
            },
            "table" : [ "tudou.kudu" ],
            "splitUpdate" : false,
            "pavingData" : true
          },
          "name" : "binlogreader"
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
            "username" : "",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "compress" : "",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "analyticalRules" : "test_${schema}_${table}",
            "schema" : "tudou",
            "tablesColumn" : "{"kudu":[{"comment":"","type":"varchar","key":"type"},{"comment":"","type":"varchar","key":"schema"},{"comment":"","type":"varchar","key":"table"},{"comment":"","type":"bigint","key":"ts"},{"part":false,"comment":"","type":"INT","key":"before_id"},{"comment":"","type":"INT","key":"after_id","part":false},{"part":false,"comment":"","type":"VARCHAR","key":"before_name"},{"comment":"","type":"VARCHAR","key":"after_name","part":false},{"part":false,"comment":"","type":"INT","key":"before_age"},{"comment":"","type":"INT","key":"after_age","part":false}]}",
            "partition" : "pt",
            "partitionType" : "MINUTE",
            "defaultFS" : "hdfs://ns",
            "hadoopConfig" : {
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://ns",
              "dfs.namenode.rpc-address.ns.nn2": "ip:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "ip:9000",
              "dfs.nameservices": "ns",
              "fs.hdfs.impl.disable.cache": "true",
              "hadoop.user.name": "root",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

0 人点赞