温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github:https://github.com/fayson/cdhproject
1.文档编写目的
在使用CDH集群中经常会有一些特定顺序的作业需要在集群中运行,对于需要多个作业顺序执行的情况下,如何能够方便的构建一个完整的工作流在CDH集群中执行,前面Fayson也讲过关于Hue创建工作流的一系列文章具体可以参考《如何使用Hue创建Spark1和Spark2的Oozie工作流》、《如何使用Hue创建Spark2的Oozie工作流(补充)》、《如何在Hue中创建Ssh的Oozie工作流》。本篇文章主要讲述如何使用Hue创建一个以特定顺序运行的Oozie工作流。本文工作流程如下:
- 内容概述
1.作业描述
2.使用Hue创建Oozie工作流
3.工作流测试
- 测试环境
1.CM和CDH版本为5.11.2
2.采用sudo权限的ec2-user用户操作
3.集群已启用Kerberos
- 前置条件
1.集群已安装Hue服务
2.集群已安装Oozie服务
2.创建一个Parquet格式的Hive表
创建一个Hive表,该表用于Spark作业保存数据,注意这里创建的Parquet格式的表
create table testaaa (
`_age` bigint COMMENT_ '',
`_id` bigint COMMENT_ '',
`_name`_ string COMMENT ''
) STORED AS parquet
3.Sqoop抽数作业
这里的Sqooop抽数以MySQL为例。
1.创建一个MySQL的测试账号及准备测试数据
CREATE USER 'testuser'@'%' IDENTIFIED BY 'password';
GRANT ALL PRIVILEGES ON *.* TO 'testuser'@'%';
FLUSH PRIVILEGES;
准备测试表和数据
代码语言:txt复制create database test_db default character set utf8;
创建表并插入测试数据
代码语言:txt复制create table user(
代码语言:txt复制 id int(10) primary key not null auto_increment,
代码语言:txt复制 name varchar(20) not null,
代码语言:txt复制 age int(10) not null
代码语言:txt复制) ENGINE=InnoDB DEFAULT CHARSET=UTF8;
向表中插入数据
代码语言:txt复制insert into test_user (name,age) values('user1', 12);
insert into test_user (name,age) values('user2', 13);
insert into test_user (name,age) values('user3', 14);
insert into test_user (name,age) values('user4', 15);
insert into test_user (name,age) values('user5', 16);
insert into test_user (name,age) values('user6', 17);
insert into test_user (name,age) values('user7', 18);
insert into test_user (name,age) values('user8', 19);
2.Sqoop抽数脚本
代码语言:txt复制sqoop import --connect jdbc:mysql://ip-172-31-22-86.ap-southeast-1.compute.internal/test_db --username testuser --password password --table test_user --target-dir /tmp/sqoop -m 1
4.Spark ETL作业
将Sqoop抽取的数据通过Python的Spark作业进行ETL操作写入Hive表中
1.编写Spark脚本
代码语言:txt复制#!/usr/local/anaconda3/bin/python
#coding:utf-8
# 初始化sqlContext
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext,Row
conf=(SparkConf().setAppName('PySparkETL'))
sc=SparkContext(conf=conf)
sqlContext = HiveContext(sc)
# 加载文本文件并转换成Row.
lines = sc.textFile("/tmp/sqoop/part-*")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(id=int(p[0]),name=p[1], age=int(p[2])))
# 将DataFrame注册为table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
sqlContext.cacheTable("people")
# 执行sql查询,查下条件年龄在13岁到16岁之间
teenagers = sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 16")
teenagers.write.saveAsTable("testaaa", mode="append")
5.Hive查询作业
将Spark作业处理后的数据写入hive表中,使用Hive对表进行查询操作
编写hive-query.sql文件,内容如下:
select * from testaaa where age>=10 and age<=15
6.创建工作流
- 进入Hue界面,选择”Workflows” => “Editors”=> “Workflows”
- 在以下界面中点击“Create”按钮创建工作流
3.然后进入WrokSpace
将工作流相关的JDBC驱动包、ETL和Hive脚本放在当前WorkSpace的lib目录下
4.在工作流中添加Sqoop抽数作业
5.添加PySpark ETL工作流
5.添加Hive工作流
如下是一个完成的工作流
点击保存,完成工作流定义。
7.工作流运行
1.工作流保存成功后,点击运行
2.Oozie调度任务执行成功
8.作业运行结果查看
Sqoop抽数结果查看
Spark ETL执行成功查看Hive表testaaa数据
Hive作业执行结果查看
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。