客快物流大数据项目(五十四):初始化Spark流式计算程序

2022-03-04 10:02:51 浏览数 (1)

目录

初始化Spark流式计算程序

一、SparkSql参数调优设置 

1、设置会话时区

2、​​​​​​​设置读取文件时单个分区可容纳的最大字节数

3、设置合并小文件的阈值

4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数

5、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

二、测试数据是否可以消费成功

初始化Spark流式计算程序

实现步骤:

  • etl模块realtime目录创建 App 单例对象,初始化 spark 运行环境
  • 创建main方法
  • 编写代码
    • 初始化spark环境参数
    • 消费kafka的ogg数据
    • 消费kafka的canal数据
    • 打印kafka的数据

参考代码:

代码语言:javascript复制
package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.commons.lang.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 测试消费kafka的数据
 * 1)物流相关的数据
 * 2)客户关系管理系统的数据
 */
object App {
  /**
   * 入口函数
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化spark的运行环境
     * 2)判断当前的运行环境(local/linux运行环境)
     * 3)创建sparkSession对象
     * 4)初始化物流topic数据的连接参数
     * 5)初始化客户关系系统topic数据的连接参数
     * 6)消费oracle->ogg->kafka的topic数据
     * 7)消费mysql->canal->kafka的topic数据
     * 8)启动运行等待停止
     */
    //1)初始化spark的运行环境
    val conf: SparkConf = new SparkConf()
      //设置应用的名称
      .set("spark.app.name", this.getClass.getSimpleName)
      //设置时区
      .set("spark.sql.session.timeZone", "Asia/Shanghai")
      //设置单个分区可容纳的最大字节数,默认是128M, 等同于block块的大小
      .set("spark.sql.files.maxPartitionBytes", "134217728")
      //设置合并小文件的阈值,避免每个小文件占用一个分区的情况
      .set("spark.sql.files.openCostInBytes", "134217728")
      //设置join或者shuffle的时候使用的分区数,默认情况下分区数是200
      .set("spark.sql.shuffle.partitions", "600")
      //设置join操作时可以广播到worker节点的最大字节大小,可以避免shuffer操作
      .set("spark.sql.autoBroadcastJoinThreshold", "67108864")

    //2)判断当前的运行环境(local/linux运行环境)
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
      //本地环境LOCAL_HADOOP_HOME
      System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
      //设置运行环境和checkpoint路径
      conf.set("spark.master", "local[*]").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppWinCheckpointDir)
    } else {
      //生产环境
      conf.set("spark.master", "yarn").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppDfsCheckpointDir)
    }

    //3)创建sparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //4)初始化物流topic数据的连接参数
    val logisticsKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaLogisticsTopic,
      "group.id" -> "logistics",
      //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
      "failOnDataLoss" -> "false"
    )

    //5)初始化客户关系系统topic数据的连接参数
    val crmKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaCrmTopic,
      "group.id" -> "logistics",
      //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
      "failOnDataLoss" -> "false"
    )

    //导入隐士转换
    import sparkSession.implicits._

    //6)消费oracle->ogg->kafka的topic数据
    val logisticsDF: DataFrame = sparkSession.readStream.format("kafka").options(logisticsKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //7)消费mysql->canal->kafka的topic数据
    val crmDF: DataFrame = sparkSession.readStream.format("kafka").options(crmKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //输出数据
   logisticsDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
    crmDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }
}

一、SparkSql参数调优设置 

1、​​​​​​​设置会话时区

会话本地时区的ID

代码语言:javascript复制
.set("spark.sql.session.timeZone", "Asia/Shanghai")

会话时区使用配置'spark.sql.session.timeZone'设置,如果未设置,将默认为JVM系统本地时区

2、​​​​​​​设置读取文件时单个分区可容纳的最大字节数

读取文件时单个分区可容纳的最大字节数,默认128M,等同于Block块大小

代码语言:javascript复制
.set("spark.sql.files.maxPartitionBytes", "134217728")

3、设置合并小文件的阈值

用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。

该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度),默认是4M

说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并,防止太多单个小文件占一个分区情况。

代码语言:javascript复制
.set("spark.sql.files.openCostInBytes", "134217728")

4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数

对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle(掉队)

代码语言:javascript复制
.set("spark.sql.shuffle.partitions", "600")

5、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。

代码语言:javascript复制
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")

否则会报如下错误:

Exception in thread “broadcast-exchange-0” java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes 

原因:

从问题来分析说是内存溢出了,也就是说明广播内存不够用,即使不断设整任务的内存资源,无论是executor还是driver的内存都分配多一倍了,但是还是不起作用。

所以这个配置的最大字节大小是用于当执行连接时,该表将广播到所有工作节点。通过将此值设置为-1,广播可以被禁用。

二、测试数据是否可以消费成功

测试步骤:

  • 启动docker并启动Order和Mysql数据库(包含OGG服务和Canal-server服务
  • 启动造数程序(位于logistics-generate项目下的cn.it.logistics.generate.App类
  • 启动App单例对象

0 人点赞