History Server概述
Flink有一个History Server,可以用来在相应的Flink集群关闭后查询已完成作业的统计信息。例如有个批处理作业是凌晨才运行的,并且我们都知道只有当作业处于运行中的状态,才能够查看到相关的日志信息和统计信息。所以如果作业由于异常退出或者处理结果有问题,我们又无法及时查看(凌晨运行的)作业的相关日志信息。那么History Server就显得十分重要了,因为通过History Server我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。
此外,它对外提供了REST API,它接受HTTP请求并使用JSON数据进行响应。Flink任务停止后,JobManager会将已经完成任务的统计信息进行存档,History Server进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的checkpoint、任务运行时的相关配置。
官方文档:
- History Server
History Server的使用
History Server允许查询由JobManager归档的已完成作业的状态和统计信息。已完成作业的归档在JobManager上进行,JobManager会将归档的作业信息upload到文件系统目录,这个文件系统可以是本地文件系统、HDFS、H3等,这个目录是可以在配置文件中指定的。然后还需要配置History Server去扫描这个目录,并且可以配置扫描的间隔时间。
因此,我们在使用History Server之前需要配置一下这几个配置项:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# vim conf/flink-conf.yaml
# 指定由JobManager归档的作业信息所存放的目录,这里使用的是HDFS
jobmanager.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server扫描哪些归档目录,多个目录使用逗号分隔
historyserver.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server间隔多少毫秒扫描一次归档目录
historyserver.archive.fs.refresh-interval: 10000
# History Server所绑定的ip,0.0.0.0代表允许所有ip访问
historyserver.web.address: 0.0.0.0
# 指定History Server所监听的端口号
historyserver.web.port: 8082
- 关于History Server的配置可以参考官方文档:History Server Config
配置完成后,可以使用如下命令启动History Server:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# ./bin/historyserver.sh start
Starting historyserver daemon on host hadoop01.
[root@hadoop01 /usr/local/flink]#
检查一下是否启动成功:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# netstat -lntp |grep 8082
tcp6 0 0 :::8082 :::* LISTEN 3200/java
[root@hadoop01 /usr/local/flink]# jps |grep HistoryServer
3200 HistoryServer
[root@hadoop01 /usr/local/flink]#
提交一个作业跑一下,看看完成后是否会生成归档信息:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
作业跑完后,可以在HDFS中看到生成的归档目录:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# hadoop fs -ls /
Found 1 items
drwxr-xr-x - root supergroup 0 2020-09-30 11:00 /completed-jobs
[root@hadoop01 /usr/local/flink]#
然后使用浏览器访问8082端口可以在web界面上查看已运行完的作业信息:
点进去可以看到详细的统计信息:
这些信息都是以JSON的格式存放在归档目录下的文件中,文件以作业的id命名:
代码语言:javascript复制[root@hadoop01 /usr/local/flink]# hadoop fs -ls /completed-jobs
Found 1 items
-rw-r--r-- 1 root supergroup 31606 2020-09-30 11:00 /completed-jobs/3f9f7ec2a7a765660bdc09922d0b7d0f
[root@hadoop01 /usr/local/flink]#
History Server REST API使用
根据官方文档的描述,History Server提供了如下REST API,所有API的响应数据都是JSON格式:
/config
/jobs/overview
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/config
接口可以获取基础配置信息,请求示例:
[root@hadoop01 ~]# curl http://localhost:8082/config
{"refresh-interval":10000,"timezone-name":"中国时间","timezone-offset":28800000,"flink-version":"1.11.2","flink-revision":"DeadD0d0 @ 1970-01-01T01:00:00 01:00","features":{"web-submit":false}}
[root@hadoop01 ~]#
/config
接口可以获取已完成的job信息列表,请求示例:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/overview
{"jobs":[{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","state":"FINISHED","start-time":1601434820548,"end-time":1601434826749,"duration":6201,"last-modification":1601434826749,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":3,"canceling":0,"canceled":0,"failed":0,"reconciling":0}}]}
[root@hadoop01 ~]#
/jobs/<jobid>
接口可以获取指定Job的详细信息,我们可以基于上一个接口返回的Job ID获取指定Job的详细信息,由于内容太多就不贴出来了:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f
/jobs/<jobid>/config
接口可以获取指定Job的配置信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/config
{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{}}}
[root@hadoop01 ~]#
/jobs/<jobid>/exceptions
接口可以获取指定Job的异常信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/exceptions
{"root-exception":null,"timestamp":null,"all-exceptions":[],"truncated":false}
[root@hadoop01 ~]#
/jobs/<jobid>/accumulators
可以获取指定Job的计数器信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/accumulators
其余API也是类似的,这里就不逐一演示了。
Monitoring REST API
除了History Server REST API,Flink还提供了Monitoring REST API,该API也是RESTFul风格,接受HTTP请求,响应JSON数据。监控API可以用来查询正在运行的作业以及最近完成的作业的状态和统计信息。Flink自己的dashboard就是使用的这个监控API,并且该监控API也可以被自定义的监控工具使用,例如我们可以自己基于这些API开发属于自己的监控工具。官方文档:
- Monitoring REST API
监控API由web服务器支持作为 Dispatcher 的一部分运行。默认情况下,此服务监听在8081端口,可以在flink-conf.yaml
通过rest.port
进行配置。需要注意的是,目前监控API的web服务和仪表板的web服务是相同的,因此在同一端口上一起运行。不过,它们响应不同的HTTP Url。
在有多个 Dispatcher 的情况下(为了高可用性),每个 Dispatcher 将运行其自己的监控API实例,该实例提供有关已完成和正在运行的作业的信息,而该 Dispatcher 会被选为集群的leader。
官方文档中有详细列出所有的监控API,如果需要开发自己的监控平台,就可以深入了解下:
- API
Flink Metrics
Flink对外提供了一个度量(Metrics)系统,它允许收集和向外部系统提供度量信息。官方文档:
- Metrics
可以在任何继承了RichFunction
的用户函数内部调用 getRuntimeContext().getMetricGroup()
方法来访问度量系统。此方法返回一个MetricGroup
对象,你可以在该对象上创建和注册新的度量。如下示例:
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
// 注册一个计数器度量
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
默认情况下,Flink收集了几个可以深入了解当前状态的指标。官方文档对所有指标都有相应的描述:
- System metrics