大数据平台 —— 调度系统之Azkaban

2020-11-16 10:35:24 浏览数 (1)

Azkaban介绍

常见的开源调度框架:

  • Linux Crontab:针对个人用户及小任务量
  • Apache Oozie:功能强大,配置复杂的Hadoop任务调度框架
  • Azkaban:开源的工作流管理器,轻量级调度框架
  • AirFlow:基于Python开发的通用批处理调度框架
  • Zenus:阿里开源的基于Hadoop的工作流调度系统
  • EasyScheduler:国内开源的分布式工作流任务调度系统

开源调度框架对比:

Azkaban简介:

  • Linkedin公司开源的分布式批量工作流任务调度器
  • 通过简单的KV的方式,生成Job,并构建依赖关系
  • 通过插件化的任务提交模块,支持可扩展的多任务提交
  • 官方文档:https://azkaban.readthedocs.io/en/latest/

Azkaban优点:

  • 可通过job配置文件,快速建立任务和任务之间的依赖关系
  • 提供模块化和可插拔的插件机制,原生支持shell、 java、 hive等
  • 基于Java开发,提供Ajax Api,易于二次开发

Azkaban适用场景:

  • 通过Azkaban结合Datax实现定时的数据采集服务
  • 通过Azkaban调度执行Shell、Java、 Hive、 Hadoop等 任务
  • 开发可复用的程序,通过Azkaban编排成工作流,执行批处理任务
  • 对Azkaban进行二次开发通过接口创建任务、调度任务、管理任务
  • 将Azkaban作为数据平台的- -部分,提供任务调度的能力
  • 基于Azkaban的异常处理、监控报警、审计日志完善数据平台功能

Azkaban架构与调度流程

Azkaban架构图如下:

  • AzkabanServer:Azkaban的管理服务,提供WebUI,负责Project管理、权限管理、定时执行、跟踪进度、审计日志等等功能
  • AzkabanExecutor:负责工作流的提交和执行,搜集执行日志,也就是具体干活的节点
  • MySQL:存储工作流详情及节点和任务的状态信息等

其中AzkabanWebServer可以说是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。

同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。

并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。

Azkaban有三种部署模式:

  • Solo mode:内置数据库,Server和Executor在同一个 进程中
  • Two mode:基于Mysq|数据库,启动一个Server和一个Executor
  • Multi mode:分布式模式,一个Server和多个Executor

Azkaban执行流程图:

  1. 用户通过界面或者API提交任务到Webserver,Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个合适的executor下发工作流;
  2. executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;
  3. 分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;
  4. 被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。

Azkaban核心交互流程:

  1. AzkabanServer主动调用Executor的API获取状态信息
  2. 根据计算规则选择执行的Executor Server(任务数量、内存和CPU等资源、最近分配的时间)
  3. 调度WorkFlow到Executor执行,Executor执行并监控任务

Azkaban安装部署

  • Azkaban官网:https://azkaban.github.io
  • 软件下载地址:https://github.com/azkaban/azkaban
  • 官方插件地址:https://github.com/azkaban/azkaban-plugins
  • 官方文档地址:https://azkaban.readthedocs.io/en/latest/getStarted.html

这里采用的是Two mode部署模式,因为Multi mode只不过是在该基础上部署了多个ExecutorServer,也就是说在Two mode基础上增加ExecutorServer节点就是Multi mode了。

编译Azkaban源码

首先,准备好Java和Maven:

代码语言:javascript复制
[root@azkaban01 ~]# java -version
java version "1.8.0_261"
Java(TM) SE Runtime Environment (build 1.8.0_261-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)
[root@azkaban01 ~]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 1.8.0_261, vendor: Oracle Corporation, runtime: /usr/local/jdk/1.8/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@azkaban01 ~]# 

安装一些工具:

代码语言:javascript复制
[root@azkaban01 ~]# yum install -y git gcc-c  

然后从GitHub上拉取Azkaban的源码:

代码语言:javascript复制
[root@hadoop01 ~]# cd /usr/local/src
[root@hadoop01 /usr/local/src]# git clone https://github.com/azkaban/azkaban.git

进入源码目录,在settings.gradle文件的开头增加插件仓库配置:

代码语言:javascript复制
[root@hadoop01 /usr/local/src]# cd azkaban
[root@azkaban01 /usr/local/src/azkaban]# vim settings.gradle 
pluginManagement {
    repositories {
        maven {
            url 'https://maven.aliyun.com/repository/gradle-plugin'
        }
        gradlePluginPortal()
    }
}

...

然后修改build.gradle文件中的仓库配置:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# vim build.gradle
buildscript {
  repositories {
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
    maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
  }
  ...
}

...

allprojects {
  apply plugin: 'jacoco'

  repositories {
    mavenLocal()
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
    maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
  }
}

gradle/wrapper/gradle-wrapper.properties文件中会定义从远程下载gradle,如果下载不下来的话,可以通过别的方式下载,然后上传到相应的目录下,并在该文件指定从本地文件系统中加载gradle的安装包:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# vim gradle/wrapper/gradle-wrapper.properties
distributionUrl=file:///usr/local/src/gradle-4.6-all.zip

完成以上的修改后,就可以执行如下命令开始编译安装了:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

打包编译的过程中,有可能会报如下错误:

代码语言:javascript复制
FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':azkaban-web-server:nodeSetup'.
> Could not resolve all files for configuration ':azkaban-web-server:detachedConfiguration1'.
   > Could not download node-linux-x64.tar.gz (org.nodejs:node:8.10.0)
      > Could not get resource 'https://nodejs.org/dist/v8.10.0/node-v8.10.0-linux-x64.tar.gz'.
         > Read timed out

这是因为系统中没有安装NodeJS,而azkaban-web-server这个模块需要用到NodeJS来编译web代码。由于无法通过远程下载NodeJS的安装包就会报这个错。解决方式也简单,在系统中安装NodeJS即可。步骤如下:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# curl --silent --location https://rpm.nodesource.com/setup_14.x | bash -
[root@azkaban01 /usr/local/src/azkaban]# yum install -y nodejs
[root@azkaban01 /usr/local/src/azkaban]# npm -v
6.14.8
[root@azkaban01 /usr/local/src/azkaban]# node -v
v14.15.0
[root@azkaban01 /usr/local/src/azkaban]# 

设置npm使用淘宝镜像仓库:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# npm config set registry https://registry.npm.taobao.org
[root@azkaban01 /usr/local/src/azkaban]# npm config get registry 
https://registry.npm.taobao.org/
[root@azkaban01 /usr/local/src/azkaban]# 

打开azkaban-web-server模块下的build.gradle文件,修改原本的仓库配置,并注释掉node相关的配置。如下所示:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# vim azkaban-web-server/build.gradle
buildscript {
    repositories {
        maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
        maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
        maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
        maven { url 'https://maven.aliyun.com/repository/google' }
        maven { url 'https://maven.aliyun.com/repository/jcenter' }
        mavenCentral()
    }
    ...
}

...

//node {
    // Version of node to use.
    //version = '8.10.0'

    // Version of npm to use.
    //npmVersion = '5.6.0'

    // Base URL for fetching node distributions (change if you have a mirror).
    //distBaseUrl = 'https://nodejs.org/dist'

    // If true, it will download node using above parameters.
    // If false, it will try to use globally installed node.
    //download = true

    // Set the work directory for unpacking node
    //workDir = file("${project.buildDir}/nodejs")

    // Set the work directory where node_modules should be located
    //nodeModulesDir = file("${project.projectDir}")
//}

然后重新执行打包编译命令:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

最终打包编译成功:

此时在核心组件的build/distributions目录下,可以看到打包好的安装包:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-exec-server/build/distributions/
azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz  azkaban-exec-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-web-server/build/distributions/
azkaban-web-server-0.1.0-SNAPSHOT.tar.gz  azkaban-web-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-db/build/distributions/
azkaban-db-0.1.0-SNAPSHOT.tar.gz  azkaban-db-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# 

安装部署Azkaban

解压安装包:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# mkdir /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-db/build/distributions/azkaban-db-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-exec-server/build/distributions/azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-web-server/build/distributions/azkaban-web-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban

为了查看方便,将解压后的目录重命名:

代码语言:javascript复制
[root@azkaban01 /usr/local/src/azkaban]# cd /usr/local/azkaban/
[root@azkaban01 /usr/local/azkaban]# mv azkaban-db-0.1.0-SNAPSHOT/ azkaban-db
[root@azkaban01 /usr/local/azkaban]# mv azkaban-exec-server-0.1.0-SNAPSHOT/ azkaban-exec-server
[root@azkaban01 /usr/local/azkaban]# mv azkaban-web-server-0.1.0-SNAPSHOT/ azkaban-web-server

首先,到MySQL中创建azkaban数据库,然后将azkaban-db目录下的create-all-sql-0.1.0-SNAPSHOT.sql文件给导入到MySQL中:

代码语言:javascript复制
create database azkaban;
use azkaban;
source /usr/local/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql

然后配置azkaban-exec-server

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban]# cd azkaban-exec-server/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim conf/azkaban.properties
# webserver的连接地址
azkaban.webserver.url=http://localhost:8081

database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100

由于azkaban-exec-server默认使用的是5.x版本的MySQL驱动,而我这部署的MySQL是8.x版本的,所以还得替换一下MySQL驱动包:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# rm -rf lib/mysql-connector-java-5.1.28.jar 

启动azkaban-exec-server:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh

检查azkaban-exec-server进程是否正常运行:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# jps
2005 Jps
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# netstat -lntp |grep 1982
tcp6       0      0 :::35195                :::*               LISTEN      1982/java           
tcp6       0      0 :::36304                :::*               LISTEN      1982/java           
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# 

通过API手动激活Executor Server:

代码语言:javascript复制
$ curl http://localhost:35195/executor?action=activate

接着配置azkaban-webserver

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# vim conf/azkaban.properties 
database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100

替换MySQL驱动包:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# rm -rf lib/mysql-connector-java-5.1.28.jar

启动azkaban-webserver:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

检查azkaban-webserver进程是否正常运行:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# jps
2201 Jps
2172 AzkabanWebServer
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# netstat -lntp |grep 2172
tcp6       0      0 :::46136                :::*              LISTEN      2172/java           
tcp6       0      0 :::8081                 :::*              LISTEN      2172/java           
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# 

使用浏览器访问webserver的页面,会进入到登录页,默认的用户名和密码都是azkaban

  • webserver的用户相关配置可以在conf/azkaban-users.xml文件中修改

登录成功进入到首页,如下:


提交Azkaban任务

关于Job的官方文档:

  • https://azkaban.readthedocs.io/en/latest/createFlows.html#job-config

Azkaban工作流:

  • Project:Azkaban的抽象概念,项目。一个Project包括多个Flow
  • Flow:流程,一个Flow包含多个Job及Job的依赖关系
  • Job:具体的任务,有command、java、hive、hadoopJava等 类型

Azkaban任务类型:

  • Azkaban拥有独立的plugins仓库,需对其进行编译
  • 不同的Job plugin是建立在command的基础之.上
  • Command类型是万能的Azkaban任务类型,因为通过command调用shell脚本,就可以在shell脚本里实现任意操作

单个任务

我们来通过WebServer的可视化界面提交一个最简单的command任务,首先创建任务定义文件:

代码语言:javascript复制
$ vim cmd_test.job
type=command
command=sh job1.sh

编写一个简单的shell脚本:

代码语言:javascript复制
$ vim job1.sh
#!/bin/sh
echo "hello azkaban"

将这两个文件打成一个zip包:

到WebServer页面上创建一个Project:

上传压缩包:

上传成功后,点击“Execute Flow” -> “Schedule”,通过配置crontab表达式定义调度的时间:

配置好表达式点击“Schedule”后,可以在“Scheduling”看到正在调度的任务:

点击“Flow”下的“cmd_test”,可以查看该任务的执行情况:

多个任务

以上演示了单个任务的定义、提交和调度,接下来演示下多个任务的定义、提交和调度,并且这多个任务之间还存在依赖关系,也就是任务之间的调度存在先后顺序。首先,创建任务文件:

代码语言:javascript复制
$ vim job1.job
type=command
command=sh job1.sh

----------

$ vim job2.job
type=command
command=sh job2.sh
# 依赖job1,当job1调度执行完才会执行job2
dependencies=job1

----------

$ vim job3.job
type=command
command=sh job3.sh
dependencies=job1

----------

$ vim job4.job
type=command
command=sh job4.sh
dependencies=job2,job3

编写与任务对应的shell脚本:

代码语言:javascript复制
$ vim job1.sh
#!/bin/sh
echo "job1 exec over"

----------

$ vim job2.sh
#!/bin/sh
echo "job2 exec over"

----------

$ vim job3.sh
#!/bin/sh
echo "job3 exec over"

----------

$ vim job4.sh
#!/bin/sh
echo "job4 exec over"

同样,将这些文件打成一个压缩包:

在WebServer上新建一个Project,并将压缩包上传:

此时展开job可以看到一个树状结构:

点击“Execute Flow”,可以看到任务之间的依赖图:

点击“Execute”执行任务,该方式是单次执行,不会调度执行。所有任务节点均执行成功,图中的节点都是绿色的:

在“Job List”可以看到任务列表,以及一些执行信息:


Azkaban用户代理

Azkaban代理用户:

  • Azkaban可以代理其他linux用户执行命令
  • 通过代理用户模式可以实现Hadoop的权限控制

编译用户代理模块:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# mkdir extlib
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# gcc /usr/local/src/azkaban/az-exec-util/src/main/c/execute-as-user.c -o extlib/execute-as-user
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# chmod 6050 extlib/execute-as-user

创建配置文件:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim plugins/jobtypes/commonprivate.properties
execute.as.user=true
azkaban.native.lib=/usr/local/azkaban/azkaban-exec-server/extlib/
azkaban.group.name=root

重启ExecuteServer:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/shutdown-exec.sh 
Killing executor. [pid: 1982], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh 

激活ExecutorServer:

代码语言:javascript复制
$ curl http://localhost:46176/executor?action=activate
  • Tips:ExecutorServer每次重启后端口都不一样

重启WebServer:

代码语言:javascript复制
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/shutdown-web.sh 
Killing web-server. [pid: 2172], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

接下来提交任务测试一下,创建任务定义文件:

代码语言:javascript复制
$ vim proxy.job
type=command
command=sh test.sh

编写对应的shell脚本:

代码语言:javascript复制
$ vim test.sh
#!/bin/sh
echo "----------------"
whoami
echo "----------------"

将其打成压缩包:

创建“Project”,并上传压缩包:

然后点击“Execute Flow” -> “Execute”执行该任务,此时会发现执行失败了:

查看日志可以看到不允许代理‘azkaban’用户:

到操作系统上,新建一个用户:

代码语言:javascript复制
$ useradd hadoop
  • Tips:Azkaban默认是禁止代理root用户的

修改任务配置文件,指定代理用户,如下所示:

代码语言:javascript复制
$ vim proxy.job
type=command
command=sh test.sh
user.to.proxy=hadoop

然后重新打包上传,重新执行该任务。这次任务执行成功,输出的日志如下:

以上的示例都是简单的执行一个shell脚本,如果想真正调度起一个MR任务其实也很简单,就只需要配置执行相应的命令就可以了。如下示例:

代码语言:javascript复制
type=command
command=yarn jar /soft/home/hadoop-2.8.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.5.jar pi 16 1000
user.to.proxy=hadoop

关于Java操作Azkaban Api

除了可以在可视化的Azkaban WebServer界面上进行项目的创建、任务的上传/提交等操作外,Azkaban还支持通过HTTP API来完成这些操作。因为我们如果要开发自己的大数据平台,可能并不会使用Azkaban WebServer的可视化界面,而是希望在自己的大数据平台界面去与Azkaban进行交互,完成任务的调度管理。所以Azkaban提供了HTTP Api的支持,让我们可以轻松实现与自研平台的整合。

关于Azkaban Api的官方文档地址如下:

  • https://azkaban.readthedocs.io/en/latest/ajaxApi.html

我这里准备了一个示例代码仓库,可以简单参考下:

  • https://gitee.com/Zero-One/azkaban-api-demo

0 人点赞