1. 案例说明
案例:每分钟统计一次用户的个数
在MySQL中有一张用户表users:
代码语言:javascript复制mysql> select * from users;
----------
| username |
----------
| Tony |
| Tom |
| Jack |
----------
需求是,使用spark程序读用户表,统计用户个数,保存到结果表user_count:
代码语言:javascript复制mysql> select * from user_count;
------------ ------------
| tp | user_count |
------------ ------------
| 2021-01-29 | 3 |
------------ ------------
然后用DolphinScheduler进行调度,每分钟调度一次
2. 代码
代码语言:javascript复制import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkDemo {
def main(args: Array[String]): Unit = {
if (args.length != 5) {
System.err.println("Please input 5 args: ")
System.err.println("1. JDBC URL")
System.err.println("2. JDBC driver name")
System.err.println("3. MySQL table name")
System.err.println("4. Username")
System.err.println("5. Password")
}
// 1、SparkSession
val spark = SparkSession.builder()
// .master("local[1]")
.master("yarn")
.appName("SparkDemo")
.getOrCreate()
// 2、从MySQL表读取数据到DataFrame
val userDF = spark.read
.format("jdbc")
.option("url", args(0)) // jdbc连接的地址
.option("driver", args(1)) // 驱动
.option("dbtable", args(2)) // 用户信息表
.option("user", args(3)) // 用户名
.option("password", args(4)) // 密码
.load()
// userDF.show
// 3、基于DataFrame创建临时视图(临时表)
userDF.createOrReplaceTempView("users")
// 4、执行SQL查询
val resultDF = spark.sql("select current_date as tp , count(*) as user_count from users")
// 5、打印结果
resultDF.show
// 6、保存结果到结果表
resultDF.write
.format("jdbc")
.option("url", args(0)) // jdbc连接的地址
.option("driver", args(1)) // 驱动
.option("dbtable", "user_count") // 指标表
.option("user", args(3)) // 用户名
.option("password", args(4)) // 密码
.mode(SaveMode.Append)
.save
// 7、关闭session
spark.close
}
}
代码测试后打成jar包。
3. 环境准备
大数据平台:
启动DolphinScheduler并登陆
4. DolphinScheduler操作
参考官网:https://dolphinscheduler.apache.org/zh-cn/docs/1.3.4/user_doc/system-manual.html
创建队列
- 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。
- 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。
注意:这里的队列就是Yarn中的队列,Yarn中的队列默认叫做default,在DS中要提交一个任务到Yarn的队列中,在这里要创建与Yarn队列同名的队列,并且Yarn上的队列要提前创建好
添加租户
- 租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,worker会在执行脚本的时候创建这个用户。
- 租户编码:租户编码是Linux上的用户,唯一,不能重复
- 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。
- 我们的案例是提交任务到yarn,所以需要使用hdfs用户来提交,所以创建的租户就是hdfs
创建告警组
- 告警组是在启动时设置的参数,在流程结束以后会将流程的状态和其他信息以邮件形式发送给告警组。
- 管理员进入安全中心->告警组管理页面,点击“创建告警组”按钮,创建告警组。
创建Worker分组
- 每个worker节点都会归属于自己的Worker分组,默认分组为default.
- 在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.
默认的default Worker分组中包括全部的Worker节点,是在安装DS的时候在配置文件中指定的:
代码语言:javascript复制# dolphinscheduler-1.3.3/conf/config/install_config.conf
workers="hdp01:default,hdp02:default,hdp03:default,hdp04:default"
使用此分组,代表提交的任务可以由这些节点来执行。
页面上没有提供创建Worker分组的操作,需要修改worker.properties配置文件,例如,我要让hdp02和hdp03这两个节点组成一个分组test来执行特定的任务,那么应该分别编辑hdp02和hdp03这两个节点下dolphinscheduler/conf/worker.properties:
代码语言:javascript复制worker.groups=default,test
然后重启DS,我们这里就使用默认分组来执行任务。
创建普通用户
- 用户是指登录、管理DS系统的用户,注意与租户区分,租户是Linux用户,用来执行任务
- 用户分为管理员用户和普通用户
- 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
- 普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。 注意:如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下
- 创建之后,使用普通用户登录到DS
创建项目
点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。
上传Jar包
资源中心->文件管理->上传文件
创建工作流
点击项目,进入项目,工作流->工作流定义->创建工作流
运行工作流
调度工作流
可以看到任务已经开始调度执行了。