目录
初始化Spark流式计算程序
一、SparkSql参数调优设置
1、设置会话时区
2、设置读取文件时单个分区可容纳的最大字节数
3、设置合并小文件的阈值
4、设置 join 或aggregate洗牌(shuffle)数据时使用的分区数
5、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小
二、测试数据是否可以消费成功
初始化Spark流式计算程序
实现步骤:
- 在etl模块的realtime目录创建 App 单例对象,初始化 spark 运行环境
- 创建main方法
- 编写代码
- 初始化spark环境参数
- 消费kafka的ogg数据
- 消费kafka的canal数据
- 打印kafka的数据
参考代码:
代码语言:javascript复制package cn.it.logistics.etl.realtime
import cn.it.logistics.common.Configuration
import org.apache.commons.lang.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 测试消费kafka的数据
* 1)物流相关的数据
* 2)客户关系管理系统的数据
*/
object App {
/**
* 入口函数
*
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化spark的运行环境
* 2)判断当前的运行环境(local/linux运行环境)
* 3)创建sparkSession对象
* 4)初始化物流topic数据的连接参数
* 5)初始化客户关系系统topic数据的连接参数
* 6)消费oracle->ogg->kafka的topic数据
* 7)消费mysql->canal->kafka的topic数据
* 8)启动运行等待停止
*/
//1)初始化spark的运行环境
val conf: SparkConf = new SparkConf()
//设置应用的名称
.set("spark.app.name", this.getClass.getSimpleName)
//设置时区
.set("spark.sql.session.timeZone", "Asia/Shanghai")
//设置单个分区可容纳的最大字节数,默认是128M, 等同于block块的大小
.set("spark.sql.files.maxPartitionBytes", "134217728")
//设置合并小文件的阈值,避免每个小文件占用一个分区的情况
.set("spark.sql.files.openCostInBytes", "134217728")
//设置join或者shuffle的时候使用的分区数,默认情况下分区数是200
.set("spark.sql.shuffle.partitions", "600")
//设置join操作时可以广播到worker节点的最大字节大小,可以避免shuffer操作
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
//2)判断当前的运行环境(local/linux运行环境)
if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
//本地环境LOCAL_HADOOP_HOME
System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
//设置运行环境和checkpoint路径
conf.set("spark.master", "local[*]").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppWinCheckpointDir)
} else {
//生产环境
conf.set("spark.master", "yarn").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppDfsCheckpointDir)
}
//3)创建sparkSession对象
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//4)初始化物流topic数据的连接参数
val logisticsKafkaParams: Map[String, String] = Map[String, String](
"kafka.bootstrap.servers" -> Configuration.kafkaAddress,
"subscribe" -> Configuration.kafkaLogisticsTopic,
"group.id" -> "logistics",
//表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
"failOnDataLoss" -> "false"
)
//5)初始化客户关系系统topic数据的连接参数
val crmKafkaParams: Map[String, String] = Map[String, String](
"kafka.bootstrap.servers" -> Configuration.kafkaAddress,
"subscribe" -> Configuration.kafkaCrmTopic,
"group.id" -> "logistics",
//表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
"failOnDataLoss" -> "false"
)
//导入隐士转换
import sparkSession.implicits._
//6)消费oracle->ogg->kafka的topic数据
val logisticsDF: DataFrame = sparkSession.readStream.format("kafka").options(logisticsKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()
//7)消费mysql->canal->kafka的topic数据
val crmDF: DataFrame = sparkSession.readStream.format("kafka").options(crmKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()
//输出数据
logisticsDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
crmDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()
//8)启动运行等待停止
val stream = sparkSession.streams
//stream.active:获取当前活动流式查询的列表
stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
//线程阻塞,等待终止
stream.awaitAnyTermination()
}
}
一、SparkSql参数调优设置
1、设置会话时区
会话本地时区的ID
代码语言:javascript复制.set("spark.sql.session.timeZone", "Asia/Shanghai")
会话时区使用配置'spark.sql.session.timeZone'设置,如果未设置,将默认为JVM系统本地时区
2、设置读取文件时单个分区可容纳的最大字节数
读取文件时单个分区可容纳的最大字节数,默认128M,等同于Block块大小
代码语言:javascript复制.set("spark.sql.files.maxPartitionBytes", "134217728")
3、设置合并小文件的阈值
用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。
该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度),默认是4M
说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并,防止太多单个小文件占一个分区情况。
代码语言:javascript复制.set("spark.sql.files.openCostInBytes", "134217728")
4、设置 join 或aggregate洗牌(shuffle)数据时使用的分区数
对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle(掉队)
代码语言:javascript复制.set("spark.sql.shuffle.partitions", "600")
5、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小
对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。
代码语言:javascript复制.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
否则会报如下错误:
Exception in thread “broadcast-exchange-0” java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes
原因:
从问题来分析说是内存溢出了,也就是说明广播内存不够用,即使不断设整任务的内存资源,无论是executor还是driver的内存都分配多一倍了,但是还是不起作用。
所以这个配置的最大字节大小是用于当执行连接时,该表将广播到所有工作节点。通过将此值设置为-1,广播可以被禁用。
二、测试数据是否可以消费成功
测试步骤:
- 启动docker并启动Order和Mysql数据库(包含OGG服务和Canal-server服务)
- 启动造数程序(位于logistics-generate项目下的cn.it.logistics.generate.App类)
- 启动App单例对象