spark数据保存到mysql 通过Azkaban提交集群任务

2023-02-25 15:45:59 浏览数 (2)

spark数据保存到mysql 通过Azkaban提交集群任务

toMysql.job

代码语言:javascript复制
#toMysql.job
type = command
command = bash sparkToMysql.sh

sparkToMysql.sh

代码语言:javascript复制
#!/bin/bash

spark-submit 
--class AccessLogToMySql.AccessLogSpark 
# 集群地址 不写默认local
--master spark://master:7077 
# 执行器数量 不写默认
--num-executors 10 
# 执行器内存大小 不写默认
--executor-memory 3g 
# 执行器核数 不写默认
--executor-cores 8 
# mysql jar包地址
--driver-class-path /root/hd/apache-hive-2.3.4-bin/lib/mysql-connector-java-8.0.14.jar 
# 上传任务jar到位置
/root/job/toMysql.jar

toMysql.job 和 sparkToMysql.sh压缩上传Azkaban定时执行

AccessLogSpark

代码语言:javascript复制
// 获取sparksession
val spark = SparkSession.builder().appName("AccessLogSpark").getOrCreate()

// 引入隐式转换
import spark.sqlContext.implicits._

// 读取数据
val data = spark.sparkContext.textFile("hdfs://master/data/clickLog/20190211/xxxx_click_log_access.12019_02_11_16_49_24")

// 分割数据
val splitData = data.map(_.split(" "))

// 判断处理
val filtData = splitData.filter(x => x.length >= 11)

// 数据转化dataFrame
val logF = filtData.map(x => {LogTable(x(0), x(3), x(9).toLong)}).toDF()

// 创建视图
logF.createOrReplaceTempView("AccessTable")

// sql
val sql = "select ip, sum(upflow) as upSum from AccessTable group by ip order by upSum desc"

// 查询结果
val sumFlowDF = spark.sqlContext.sql(sql)

// 把结果保存在mysql表中
// 创建Properties对象,配置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user","hive")
prop.setProperty("password","hive")

// 写入数据库 追加模式 jdbc 表名 Properties
sumFlowDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://url:3306/sqoop_data", "iptop", prop)

// 停止
spark.stop()

生成jar toMysql.jar存放到服务器指定地址

0 人点赞