Spark 异常处理之 A master URL must be set in your configuration

2022-04-18 13:51:35 浏览数 (1)

全文的一场信息为 :

yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 12.0 failed 4 times, most recent failure: Lost task 1.3 in stage 12.0 (TID 42, dn2.qa): java.lang.ExceptionInInitializerError

......

Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration

异常的场景 :

SparkApp 是提交到yarn-cluster 上面执行的,并且在submit前指定了 Master -> "yarn-cluster"

从字面上来看,提示说我没有给master提供配置。其实出现这个异常的地方有很多,比如读取配置问题异常,比如sparkconf 初始化异常等等,这些都是显而易见的,但是,本文的异常并不是那么显而易见,而是需要了解一些分布式以及spark内部的运行机制,下面来一起看一下。

异常原因:

一个spark 应用对应了一个main函数,放在一个driver里,driver里有一个对应的实例(spark context).driver 负责向各个节点分发资源以及数据。那么如果你把创建实例放在了main函数的外面,driver就没法分发了。所以如果这样写在local模式下是可以成功的,在分布式就会报错。

在提交到yarn-cluster 分布式系统进行调度的时候,driver会将job分发到不同的work中执行,那么每一个分发job都是一个task,task是在work的executor中去执行,之所以会报这个异常,就是因为程序中关于StreamingContext的初始化在main函数的外面,如代码:

代码语言:javascript复制
 val sc = new SparkConf()
 val ssc = new StreamingContext(sc, Seconds(15))
  def main(args: Array[String]): Unit = {
    val kafkaParams = Map(
      "group.id" -> "group-spark-streaming-LVaue",
      //"metadata.broker.list" -> "mq1.svc:9092,mq2.svc:9092,mq3.svc:9092",
      "metadata.broker.list" -> "das1.qa:9092,das2.qa:9092",
      "zookeeper.session.timeout.ms" -> "10000",
      "zookeeper.connection.timeout.ms" -> "10000",
      "zookeeper.sync.time.ms" -> "2000",
      "rebalance.max.retries" -> "5",
      "rebalance.backoff.ms" -> "3000")
    val s = new Properties
    s.setProperty("hosts", "redis.qa")
    s.setProperty("port", "6380")
    s.setProperty("dbIndex", "0")
    RedisPool.getInstance.init(s)
    val topics = Set("ras-topic")
    val zkHosts = "zk1.qa:2181,zk2.qa:2181,zk3.qa:2181"
    val zkPath = "/data/hadoop/zk/data"
    val kafkaStream = createCustomDirectKafkaStream(ssc, kafkaParams, zkHosts, zkPath, topics)
    val maps: scala.collection.mutable.Map[String, Set[String]] = scala.collection.mutable.Map()

如果StreamingContext是在main函数外面的话,work端在启动task的时候,就找不到关于StreamingContext的配置。也就无法启动。

0 人点赞