spark数据保存到mysql 通过Azkaban提交集群任务
toMysql.job
代码语言:javascript复制#toMysql.job
type = command
command = bash sparkToMysql.sh
sparkToMysql.sh
代码语言:javascript复制#!/bin/bash
spark-submit
--class AccessLogToMySql.AccessLogSpark
# 集群地址 不写默认local
--master spark://master:7077
# 执行器数量 不写默认
--num-executors 10
# 执行器内存大小 不写默认
--executor-memory 3g
# 执行器核数 不写默认
--executor-cores 8
# mysql jar包地址
--driver-class-path /root/hd/apache-hive-2.3.4-bin/lib/mysql-connector-java-8.0.14.jar
# 上传任务jar到位置
/root/job/toMysql.jar
toMysql.job 和 sparkToMysql.sh压缩上传Azkaban定时执行
AccessLogSpark
代码语言:javascript复制// 获取sparksession
val spark = SparkSession.builder().appName("AccessLogSpark").getOrCreate()
// 引入隐式转换
import spark.sqlContext.implicits._
// 读取数据
val data = spark.sparkContext.textFile("hdfs://master/data/clickLog/20190211/xxxx_click_log_access.12019_02_11_16_49_24")
// 分割数据
val splitData = data.map(_.split(" "))
// 判断处理
val filtData = splitData.filter(x => x.length >= 11)
// 数据转化dataFrame
val logF = filtData.map(x => {LogTable(x(0), x(3), x(9).toLong)}).toDF()
// 创建视图
logF.createOrReplaceTempView("AccessTable")
// sql
val sql = "select ip, sum(upflow) as upSum from AccessTable group by ip order by upSum desc"
// 查询结果
val sumFlowDF = spark.sqlContext.sql(sql)
// 把结果保存在mysql表中
// 创建Properties对象,配置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user","hive")
prop.setProperty("password","hive")
// 写入数据库 追加模式 jdbc 表名 Properties
sumFlowDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://url:3306/sqoop_data", "iptop", prop)
// 停止
spark.stop()
生成jar toMysql.jar存放到服务器指定地址