1)在 spark-env.sh 中设置 SPARK_DAEMON_MEMORY=2g 来解决该问题,此处画个图来了解一下shs:
2)原理剖析:
Spark History Server 是spark内置的一个http服务,通过 /sbin/start-history-server.sh启动。History Server启动后,会监听一个端口,同时启动两个定时任务线程,分别用来解析eventLog日志文件和清理过期的eventLog日志文件。
Spark History Server启动后,我们可以直接在浏览器输入 http://ip:port 访问。一般默认端口是18080.
此处使用本地开启这个服务.建立/tmp/spark-events目录.
启动以下脚本:/sbin/start-history-server.sh
./start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/sandys/code/spark/logs/spark-sandyshu-org.apache.spark.deploy.history.HistoryServer-1-SANDYSHU-MB0.out
sandyshu@SANDYSHU-MB0 sbin % ./start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/sandys/code/spark/logs/spark-sandyshu-org.apache.spark.deploy.history.HistoryServer-1-SANDYSHU-MB0.out
可知日志被定义到了一个新的位置,进入对应的目录:
/Users/sandyscode/spark/logs
对应日志如下所示:
cat spark-sandyshu-org.apache.spark.deploy.history.HistoryServer-1-SANDYSHU-MB0.out
Spark Command: /Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home/bin/java -cp /Users/sandyshu/code/spark/conf/:/Users/sandyshu/code/spark/assembly/target/scala-2.12/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
22/09/23 15:03:01 INFO HistoryServer: Started daemon with process name: 60954@SANDYSHU-MB0
22/09/23 15:03:01 INFO SignalUtils: Registering signal handler for TERM
22/09/23 15:03:01 INFO SignalUtils: Registering signal handler for HUP
22/09/23 15:03:01 INFO SignalUtils: Registering signal handler for INT
22/09/23 15:03:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/23 15:03:02 INFO SecurityManager: Changing view acls to: sandyshu
22/09/23 15:03:02 INFO SecurityManager: Changing modify acls to: sandyshu
22/09/23 15:03:02 INFO SecurityManager: Changing view acls groups to:
22/09/23 15:03:02 INFO SecurityManager: Changing modify acls groups to:
22/09/23 15:03:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: sandyshu; groups with view permissions: EMPTY; users with modify permissions: sandyshu; groups with modify permissions: EMPTY
22/09/23 15:03:02 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
22/09/23 15:03:02 INFO JettyUtils: Start Jetty 10.91.80.101:18080 for HistoryServerUI
22/09/23 15:03:02 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
22/09/23 15:03:02 INFO HistoryServer: Bound HistoryServer to 10.91.80.101, and started at http://****:18080
此处shs服务启动了,对应shs界面如下所示:
1) 界面参数解释:/tmp/spark-events是一开始我们设置的文件路径.
2) 查看spark-history-server源码分析得知很大程度依赖内存.
tips: shs(spark history server)本质是一个单独的服务.可独立开启,依赖于内存.
相关请求源码如下所示:
代码语言:javascript复制private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
初始化shs相关源码:
代码语言:javascript复制/**
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def initialize(): Unit = {
attachPage(new HistoryPage(this))
attachHandler(ApiRootResource.getServletHandler(this))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
关键性代码如下所示:
代码语言:javascript复制// 会往ContextHandler中加入一个ServletContextHandler
// 这里放着jersey的ServletContainer类,用来提供restful api.
// jersey会自动解析org.apache.spark.status.api.v1包下面的类,然后将对应的请求转发过去。
val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX) // UI_PATH_PREFIX = "/history"
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") // private val loaderServlet = new HttpServlet {
attachHandler(contextHandler)
查看attachandler对应的类,以及类注释:
对应的attachHandler方法:
代码语言:javascript复制Attaches a handler to this UI. */
def attachHandler(handler: ServletContextHandler): Unit = synchronized {
handlers = handler
serverInfo.foreach(_.addHandler(handler, securityManager))
// ServerInfo的定义如下作为集合收集当前注册信息
}
其中handler的定义如下所示:
代码语言:javascript复制protected val handlers = ArrayBuffer[ServletContextHandler]()
代码语言:javascript复制/**
* To start SparUI, Spark starts Jetty Server first to bind address.
* After the Spark application is fully started, call [attachAllHandlers]
* to start all existing handlers.
*/
override def bind(): Unit = { // 端口绑定.
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
val server = initServer()
server.addHandler(initHandler, securityManager)
serverInfo = Some(server)
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
System.exit(1)
}
}
初始化注册到Jetty上:
代码语言:javascript复制def initServer(): ServerInfo = {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
// 服务部署在Jetty服务器上
val server = startJettyServer(host, port, sslOptions, conf, name, poolSize)
server
}
这里为何出现会jetty相关的log的原因:
shs缓存机制分析:
当我们在页面点击查看某个任务的运行详情时,History Server就会重新去解析对应eventLog日志文件也就是之前对应设置的/tmp/spark-evenets/,这时就是解析整个eventLog文件了,对应的文件格式是Json.
存储在HDFS上时,使用以下配置:
构建的信息进行缓存,缓存使用了guava的CacheLoader,对应的log如下所示:
缓存的个数限制由配置spark.history.retainedApplications决定,默认值是50.
50个Application的信心存储在内存中.
在将任务信息放入缓存的同时,History Server还会提前构建好这个任务的各种状态的sparkUI(也就是web界面),
其实上history UI也是继承webUI进行实现了,详情代码逻辑见前文.
并创建好ServletContextHandler(见前文).
鉴于shs和内存依赖关系比较大,以及查询默认的配置值1G之后,
代码语言:javascript复制<tr>
<td><code>SPARK_DAEMON_MEMORY</code></td>
<td>Memory to allocate to the Spark master and worker daemons themselves (default: 1g).</td>
</tr>
建议设置为2G.