背景介绍
大数据平台的资源管理组件主要针对存储资源与计算资源进行分析优化。前文《大数据平台:资源管理及存储优化技术》主要介绍了存储资源优化,本文主要介绍大数据平台构建过程中,计算资源相关的优化技术。
优化技术
1. 配额管理
大数据平台作为SaaS(软件即服务)系统,会将应用开放给不同的使用方,属于多租户系统架构。多租户(Multi-tenancy)允许多个用户或用户组(称为“租户”)共享同一个系统或程序的实例,同时保持各自数据和配置的隔离性。具体的,大数据平台中,不同的租户是可以共享一套存储集群HDFS和计算资源YARN,但不同租户之间互不影响。
为保证多租户的资源隔离性,平台系统会提供多维度的配额管理,主要包括:
- HDFS资源配额:管理每个租户(如项目)可使用的HDFS集群资源
- YARN资源池分配:管理每个租户可使用的YARN计算资源
- HBase配额:管理每个租户可使用HBase空间,例如Namespace数量,表数量,访问流量等
HDFS资源配额
基于CMD命令执行的常用HDFS配额操作如下:
(1). 设置HDFS指定目录配额Quota
代码语言:txt复制cmd = "hdfs dfsadmin -setQuota {0} {1}".format(maxFileNum, directory)
(2). 清理分配Quota
代码语言:txt复制cmd = "hdfs dfsadmin -clrQuota {0}".format(directory)
(3). 设置磁盘空间
代码语言:txt复制cmd = "hdfs dfsadmin -setSpaceQuota {0} {1}".format(maxDiskSpace, directory)
(4). 清理磁盘空间
代码语言:txt复制cmd = "hdfs dfsadmin -clrSpaceQuota {0}".format(directory)
YARN资源池分配
基于scheduler.xml加载更新YARN资源池
代码语言:txt复制os.path.join(hadoop_conf_dir, 'fair-scheduler.xml') --更新配额文件
cmd = "yarn rmadmin -refreshQueues"
fair-scheduler.xml 公平调度示例XML文件:
代码语言:xml复制<allocations>
<queue name="root">
<weight>0.0</weight>
<queue name="default">
<weight>0.56</weight>
</queue>
<queue name="project1">
<maxResources>81920 mb, 40 vcores</maxResources>
<weight>1.0</weight>
</queue>
<queue name="res_01">
<maxResources>81920 mb, 40 vcores</maxResources>
<weight>1.0</weight>
</queue>
</queue>
<queueMaxAppsDefault>100</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>
</allocations>
HBase配额
HBase可基于配额管理实现针对Namespace和 Table 级别的的rpc请求的限制,限制读写次数和流量。其中,Namespace支持三种容量的管理:(1).table的最大数目,(2). region的最大数目,(3).namespace占用的文件系统空间。HBase限流设置的原理是在:在hbase:quota 进行元数据管理。
设置Namespace的配额,需要hbase-site添加配置文件:
代码语言:xml复制hbase.quota.enabled=true
设置限流的API可参考示例:TestQuotaTableUtil
Java API设置Namespace配额:
代码语言:java复制connection = ConnectionFactory.createConnection(conf);
Admin admin = this.connection.getAdmin();
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("myns").build();
namespaceDescriptor.setConfiguration(
"hbase.namespace.quota.maxtables", "10");
namespaceDescriptor.setConfiguration(
"hbase.namespace.quota.maxregions", "100");
admin.createNamespace(namespaceDescriptor);
admin.close();
开启基于Region Server的多租户,需修改hbase-site.xml文件
代码语言:xml复制<property>
<name>hbase.coprocessor.master.classes</name>
<value>org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint</value>
</property>
<property>
<name>hbase.master.loadbalancer.class</name>
<value>org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer</value>
</property>
HBase shell配置设置文档:commands
(1). 基于set_quota 设置流量
代码语言:shell复制hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10req/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => READ, USER => 'u1', LIMIT => '10req/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE
hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec'
hbase> set_quota TYPE => THROTTLE, TABLE => 't1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, TABLE => 't1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => NONE
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => NONE
(2). Namesapce设置Table数量控制
代码语言:xml复制create_namespace 'ns1', {'hbase.namespace.quota.maxtables'=>'5'}
alter_namespace 'ns1', {METHOD => 'set', 'hbase.namespace.quota.maxtables'=>'8'}
(3). Namesapce设置Region数量控制
代码语言:txt复制create_namespace 'ns1', {'hbase.namespace.quota.maxregions'=>'5'}
alter_namespace 'ns1', {METHOD => 'set', 'hbase.namespace.quota.maxregions'=>'8'}
2. 调度优先级
YARN调度器采用主资源公平调度算法(Dominant Resource Fairness,DRF),该算法扩展最大最小公平算法(max-min fairness),使其可以支持多维资源调度。DRF中将所需份额(比例)最大的资源称为主资源,将最大最小公平算法应用在主资源上,将多维资源调度问题转化为单资源调度问题。
资源调度器中,每个队列可以设置一个最小和最大资源量,在极端情况下,最小资源量是每个队列需要保证的资源量,最大资源量是不能超过的资源量。资源抢占发生的原因在于最小资源量的设置,资源调度器(包括Capacity Scheduler和Fair Scheduler) 会将负载较轻队列的资源暂时分配给负载较重的队列。特别的,最小资源量并不是硬资源保证,当队列负载低,也会暂时将空闲资源分配给其他有需求的队列。对于暂时分配出去的资源,在需要使用时会"告知"资源回收并等待一段时间,若超时则强制回收进行资源抢占。
YARN任务的优先级有支持两个维度
- 全局最大优先级:yarn.cluster.max-application-priority,设置全局默认最大优先级,系统将根据优先级从高到低调度
- 队列默认优先级:yarn.scheduler.capacity.{leaf-queue-path}.default-application-priority,设置指定队列默认优先级
YARN任务的优先级规则:
- 设置优先级的数值越大,则调度优先级越高
- 任务提交时,如果没有指定优先级,使用提交队列的队列默认优先级
- 指定的优先级超过全局配置的优先级,则使用全局配置的优先级作为任务的优先级
3. 作业参数调优
作业参数调优是指在大数据运行作业(如MapReduce作业、Spark作业等)中,调整各种配置参数以优化作业的执行效率、减少资源消耗和提高系统的整体性能。常用作业参数调优:内存设置、并行度设置、I/O设置。参数调优可以分为事前、事中、事后不同形式:
- 事前:任务执行前,了解数据读写模式,理解作业特性,识别作业瓶颈,进行定向调整。如CPU密集型,则增加算力资源与并行度。
- 事中:计算引擎内置能力,如Spark支持AQE(Adaptive Query Execution) 进行动态的查询优化调整
- 事后:基于UI工具、监控指标、日志等工具,找出性能瓶颈并调整
4. 监控与分析
监控与分析是指使用各种工具和技术来跟踪和评估大数据系统的性能和资源使用情况。目的是为了发现性能瓶颈、资源瓶颈、异常行为或者效率低下的地方,并基于这些信息进行优化。
常用的实现方式:
- 监控工具
- 内置监控工具:基于大数据系统自身内置的指标数据和监控工具,例如Spark History UI
- 外置监控工具:采集大数据系统的指标数据,使用Ganglia、Prometheus等系统级监控工具来收集集群的性能指标
- 日志分析:可以基于ELK进行可视化日志数据管理
- 调优工具:可以基于Dr. Elephant工具,自动分析作业指标并提供调优建议,下面将对作业指标分析进行详细展开。
作业指标诊断
Dr. Elephant 由 LinkedIn 于 2016 年开源,是一个 Hadoop 和 Spark 的性能监控和调优工具。通过自动化收集所有作业运行指标,进行数据分析并基于UI界面化方式展示。
整体架构如图所示,包括三部分:
- 数据采集(Fetcher):自动采集执行成功的计算任务
- 内置诊断(Rule):基于内置规则,启发式进行作业诊断
- 存储及展示(DB):将分析结果保存在DB持久化,根据作业诊断分级在UI界面展示
MapReduce
1. 采集作业详情
基于JobHistory rest api调用,前缀:http://{historyServer}:{port}
(1). Job概要详情
代码语言:txt复制GET /ws/v1/history/mapreduce/jobs/{jobId}
返回结果参数说明:
字段 | 说明 |
---|---|
id | 作业的JobID |
submitTime | 作业提交时间 |
startTime | 作业启动时间 |
finishTime | 作业结束时间 |
name | 作业名称 |
queue | 作用所属资源队列 |
user | 作业提交用户 |
state | 作业状态 |
mapsTotal | MapReduce作业Mapper阶段总个数 |
mapsCompleted | Mapper阶段总完成数 |
reducesTotal | MapReduce作业Reducer阶段总个数 |
reducesCompleted | Reducer阶段总完成数 |
uberized | 是否启动uber mode,若开启(true),则MapReduce任务在同一个JVM上运行 |
diagnostics | 诊断信息 |
avgMapTime | Mapper阶段平均耗时,单位毫秒(ms) |
avgReduceTime | Reducer阶段平均耗时,单位毫秒(ms) |
avgShuffleTime | 数据Shuffle平均耗时,单位毫秒(ms) |
avgMergeTime | 数据Merge平均耗时,单位毫秒(ms) |
failedMapAttempts | Mapper阶段失败尝试次数 |
killedMapAttempts | Mapper阶段被kill次数 |
successfulMapAttempts | Mapper阶段成功执行次数 |
failedReduceAttempts | Reducer阶段失败尝试次数 |
killedReduceAttempts | Reducer阶段被kill次数 |
successfulReduceAttempts | Reducer阶段成功执行次数 |
(2).Job CounterGroup
汇总作业运行的内置Counters信息(执行计数器),对MapReduce进行作业详情统计,counter主要包括:counter的Group类型,counter名称,counter总值,counter mapper数值,counter reducer数值。
代码语言:txt复制GET /ws/v1/history/mapreduce/jobs/{jobId}/counters
返回结果参数说明:
FileSystemCounter级别
指标 | 描述 |
---|---|
FILE_BYTES_READ | 从本地文件系统读取的总字节数 |
FILE_BYTES_WRITTEN | 向本地文件系统写入的总字节数 |
FILE_READ_OPS | 本地文件系统的读操作次数 |
FILE_WRITE_OPS | 本地文件系统的写操作次数 |
FILE_LARGE_READ_OPS | 读取大文件的操作次数 |
HDFS_BYTES_READ | 从HDFS读取的总字节数 |
HDFS_BYTES_WRITTEN | 向HDFS写入的总字节数 |
HDFS_READ_OPS | HDFS的读操作次数 |
HDFS_WRITE_OPS | HDFS的写操作次数 |
HDFS_LARGE_READ_OPS | 读取HDFS上大文件的操作次数 |
JobCounter级别
指标 | 描述 |
---|---|
TOTAL_LAUNCHED_MAPS | 启动的Map任务总数 |
TOTAL_LAUNCHED_REDUCES | 启动的Reduce任务总数 |
DATA_LOCAL_MAPS | 数据本地化的Map作业数 |
SLOTS_MILLIS_MAPS | 所有Map任务在Slots的总耗时(单位:ms) |
SLOTS_MILLIS_REDUCES | 所有Reduce任务在Slots的总耗时(单位:ms) |
MILLIS_MAPS | 所有Map任务的总耗时(单位:ms) |
MILLIS_REDUCES | 所有Reduce任务的总耗时(单位:ms) |
VCORES_MILLIS_MAPS | 所有Map任务的总核数消耗(单位:vcore-ms) |
VCORES_MILLIS_REDUCES | 所有Reduce任务的总核数消耗(单位:vcore-ms) |
MB_MILLIS_MAPS | 所有Map任务的总内存消耗(单位:mb-ms) |
MB_MILLIS_REDUCES | 所有Reduce任务的总内存消耗(单位:mb-ms) |
TaskCounter级别
指标 | 描述 |
---|---|
MAP_INPUT_RECORDS | 所有Map输入记录数 |
MAP_OUTPUT_RECORDS | Map任务产生的输出记录数 |
MAP_OUTPUT_BYTES | Map任务产生的输出字节数 |
MAP_OUTPUT_MATERIALIZED_BYTES | Map输出后写入到磁盘的字节数 |
SPLIT_RAW_BYTES | Mao读取的输入-分片对象的字节数 |
COMBINE_INPUT_RECORDS | Combiner处理的输入记录数 |
COMBINE_OUTPUT_RECORDS | Combiner产生的输出记录数 |
REDUCE_INPUT_GROUPS | Reduce处理的不同分组的个数 |
REDUCE_SHUFFLE_BYTES | Reduce任务通过Shuffle接收的字节数 |
REDUCE_INPUT_RECORDS | Reduce任务处理的输入记录数 |
REDUCE_OUTPUT_RECORDS | Reduce任务产生的输出记录数 |
SPILLED_RECORDS | 作业中所有任务溢出到磁盘的记录数 |
SHUFFLED_MAPS | 通过Shuffle从Map传输到Reduce的记录数 |
FAILED_SHUFFLE | 失败的Shuffle操作数 |
MERGED_MAP_OUTPUTS | 合并的Map输出数 |
GC_TIME_MILLIS | 垃圾回收消耗的时间(单位:ms) |
CPU_MILLISECONDS | 总计的CPU时间(单位:ms) |
PHYSICAL_MEMORY_BYTES | 物理内存字节数 |
VIRTUAL_MEMORY_BYTES | 虚拟内存字节数 |
COMMITTED_HEAP_BYTES | JVM中的总有效的堆内存量 |
(3).Job Tasks信息
获取作业下的所有task信息及对应的各个task的统计汇总信息和任务执行信息。
(3.1). Task列表
代码语言:txt复制GET /ws/v1/history/mapreduce/jobs/{jobID}/tasks
参数说明:
字段 | 说明 |
---|---|
startTime | Task任务的启动时间 |
finishTime | Task任务的结束时间 |
elapsedTime | Task任务的耗时 |
id | Task任务ID |
state | Task状态 |
type | Task类型:MAP、REDUCE |
successfulAttempt | 任务成功执行的Attempt信息 |
(3.2). Task Counters,获取指定task的counter统计信息
代码语言:txt复制GET /ws/v1/history/mapreduce/jobs/{jobID}/tasks/{taskId}/counters
(3.3). Task Attempts,获取指定task的attempts重试执行信息
代码语言:txt复制GET /ws/v1/history/mapreduce/jobs/{jobID}/tasks/{taskId}/attempts
参数说明:
字段 | 说明 |
---|---|
startTime | Task任务的启动时间 |
finishTime | Task任务的结束时间 |
rack | Task任务执行机架信息 |
nodeHttpAddress | Task任务执行的node节点信息 |
assignedContainerId | Task任务被分配的containerId |
2. Mapper/Reducer GC
指标说明:分别对Mapper、Reducer类型的Task任务进行分析,分析Task的GC效率,GC/CPU的使用比例、任务运行时间分布,分析指标:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Avg task runtime (ms) | 对应Tasks的运行时间平均值 | avg(TaskAttempt.finishTime - TaskAttempt.startTime) |
Avg task CPU time (ms) | 对应Tasks CPU耗时平均值, TaskCounter.CPU_MILLISECONDS 获取对应CPU耗时 | avg(TaskCounter.CPU_MILLISECONDS) |
Avg task GC time (ms) | 对应Tasks GC耗时平均值, TaskCounter.GC_TIME_MILLIS 获取对应GC耗时 | avg(TaskCounter.GC_TIME_MILLIS) |
Task GC/CPU ratio | GC/CPU耗时比例,计算GC效率 | avg(TaskCounter.GC_TIME_MILLIS)/avg(TaskCounter.CPU_MILLISECONDS) |
指标建议:如果Task GC/CPU ratio 过高,则说明对应GC耗时比例过多,应该检查代码进行优化,减少GC耗时,Task运行时间过长,则说明该阶段Task任务过多,需重点关注,返回最严重的指标建议。
指标 | LOW | MODERATE | SEVERE | CRITICAL |
---|---|---|---|---|
GC Ratio | 0.01 | 0.02 | 0.03 | 0.04 |
Task Runtime(单位:分钟) | 5 | 10 | 12 | 15 |
3. Mapper/Reducer Memory
指标说明:分别对Mapper、Reducer类型的Task任务进行分析,分析Task的内存使用率,分析指标如下:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Avg Physical Memory (MB) | 对应Tasks消耗内存平均值,单位MB | avg(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB |
Max Physical Memory (MB) | Tasks中最大的内存消耗,单位MB | max(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB |
Min Physical Memory (MB) | Tasks中最小的内存消耗,单位MB | min(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB |
Avg Virtual Memory (MB) | 对应Tasks消耗虚拟内存平均值,单位MB | avg(TaskCounter.VIRTUAL_MEMORY_BYTES)/MB |
Requested Container Memory | 配置参数中执行Map/Reduce任务container的配置额 | mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,如果没有配置默认是2G |
指标建议:根据分析Task内存使用情况,判断Container分配的内存是否合理,如果使用的内存远远小于container配额内存,则说明container内存配额过大;根据内存的统计值信息(平均值、最大值、最小值)判断是否有数据倾斜问题。
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Physical Memory Ratio | 0.6 | 0.5 | 0.4 | 0.3 | Physical Memor/Container Memory |
Container配额 | 1.1 | 1.5 | 2.0 | 2.5 | 默认2G,对应倍数 |
4. Mapper/Reducer Skew
指标说明:分别对Mapper、Reducer类型的Task任务进行分析,统计分析任务的数据倾斜指标,将Tasks任务数据分为两组,其中一组(Group A)是小于平均值Tasks分析,另一组(Group B)是大于平均值Tasks分析,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Data skew (Number of tasks) | 对应Tasks的数量 | |
Data skew (Group A) | 获取Group A的Tasks数量和平均输入值,计算所有Tasks的数据量平均值,过滤Task 数据量小于平均值的为Group A | avg(GroupA(FileSystemCounter.DATA_BYTES)) |
Data skew (Group B) | 获取Group B的Tasks数量和平均输入值,计算所有Tasks的数据量平均值,过滤Task 数据量大于平均值的为Group B | avg(GroupB(FileSystemCounter.DATA_BYTES)) |
Time skew (Number of tasks) | 对应Tasks的数量 | |
Time skew (Group A) | 获取Group A的Tasks数量和平均运行耗时,计算所有Tasks的运行耗时平均值,过滤Task 运行耗时小于平均值的为Group A | avg(GroupA(TaskAttempt.finishTime - TaskAttempt.startTime)) |
Time skew (Group B) | 获取Group B的Tasks数量和平均运行耗时,计算所有Tasks的运行耗时平均值,过滤Task 运行耗时大于平均值的为Group B | avg(GroupB(TaskAttempt.finishTime - TaskAttempt.startTime)) |
指标建议:若Group A和 Group B统计数值相差较大,则说明存在数据倾斜,对于Map任务应尽量减少过多小文件输入,对于Reduce任务尽量减少根据Key的聚合操作(如 GroupByKey、JoinKey)。
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Data Severity | 2 | 4 | 8 | 16 | (avgDataGroupB/avgDataGroupA) - 1 |
Time Severity | 2 | 4 | 8 | 16 | (avgTimeGroupB/avgTimeGroupA) - 1 |
Data GroupA TasksNum Severity | 10 | 50 | 100 | 200 | DataGroupA.size |
Time GroupA TasksNum Severity | 10 | 50 | 100 | 200 | TimeGroupA.size |
5. Mapper/Reducer Time
指标说明:分别对Mapper、Reducer类型的Task任务进行分析,对Task任务运行耗时进行统计分析,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Average task runtime | 对应Tasks的运行耗时平均值 | avg(TaskAttempt.finishTime - TaskAttempt.startTime) |
Max task runtime | 对应Tasks的运行耗时最大值 | max(TaskAttempt.finishTime - TaskAttempt.startTime) |
Min task runtime | 对应Tasks的运行耗时最小值 | min(TaskAttempt.finishTime - TaskAttempt.startTime) |
Average task input size | Map Task中平均数据值(仅针对Map任务) | avg(FileSystemCounter.HDFS_BYTES_READ) |
指标建议:根据Task数量与Task的运行耗时统计数据,判断Tasks数量是否合理,输入文件大小是否合理。
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Short Runtime Severity | 10min | 4min | 2min | 1min | avg(TaskAttempt.finishTime - TaskAttempt.startTime) |
Long Runtime Severity | 15min | 30min | 60min | 120min | avg(TaskAttempt.finishTime - TaskAttempt.startTime) |
Task Num Severity | 50 | 100 | 500 | 1000 | tasks.size |
6. Mapper Speed
指标说明:对Mapper类型的Task任务进行分析,以指标反映Map人物的运行速率,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Median task input size | Map Task中Input数据量中位值 | median(FileSystemCounter.HDFS_BYTES_READ) |
Median task runtime | Tasks的运行耗时中位值 | median(TaskAttempt.finishTime - TaskAttempt.startTime) |
Median task speed | Tasks运行速率中位值 | median((1000 * inputBytes) / (runtimeMs)) |
Total input size in MB | Map Task中Input总数据量 | sum(FileSystemCounter.HDFS_BYTES_READ) |
指标建议:分析Map任务的每秒运行效率,判断Map任务是否是CPU密集型,并进行代码优化
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Speed Severity | 1/2 * 100M | 1/4 * 100M | 1/8 * 100M | 1/32 * 100M | median((1000 * inputBytes) / (runtimeMs)) |
Runtime Severity | 5min | 10min | 15min | 30min | median(TaskAttempt.finishTime - TaskAttempt.startTime) |
7. Mapper Spill
指标说明:对Mapper类型的Task任务进行分析,磁盘IO持久化性能,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Avg spilled records per task | 每个Task的平均磁盘溢出记录数 | sum(TaskCounter.SPILLED_RECORDS)/tasks.size |
Avg output records per task | 每个Task的平均Output记录数 | sum(TaskCounter.MAP_OUTPUT_RECORDS)/tasks.size |
Ratio of spilled records to output records | 溢出记录数与输出记录数比值 | sum(TaskCounter.SPILLED_RECORDS)/sum(TaskCounter.MAP_OUTPUT_RECORDS) |
指标建议:分析Mapper任务的IO性能,是否存在IO溢出到磁盘,降低IO性能:
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Task Num Severity | 50 | 100 | 500 | 1000 | tasks.size |
IO Spill Severity | 2.01 | 2.2 | 2.5 | 3.0 | sum(TaskCounter.SPILLED_RECORDS)/sum(TaskCounter.MAP_OUTPUT_RECORDS) |
8. Shuffle Sort
指标说明:对Reducer类型的Task任务进行分析,在Shuffle/Sort阶段的耗时统计,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Number of tasks | 对应Tasks的数量 | |
Average code runtime | 真正Code执行时间的平均值 | avg(TaskAttempt.finishTime -TaskAttempt.mergeFinishTime) |
Average shuffle time | Shuffle时间的平均值,并计算与code runtime的比例(avgShuffleTime/avgCodeTime) | avg(TaskAttempt.shuffleFinishTime - TaskAttempt.startTime) |
Average sort time | Sort时间的平均值,并计算与code runtime的比例(avgSortTime/avgCodeTime) | avg(TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime) |
- Reduce Code执行时间:totalTimeMs - shuffleTimeMs - sortTimeMs
- totalTimeMs = TaskAttempt.finishTime - TaskAttempt.startTime
- shuffleTimeMs = TaskAttempt.shuffleFinishTime - TaskAttempt.startTime
- sortTimeMs = TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime
指标建议:根据对应Shuffle、Sort时间进行优化判断,对应的指标如下
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Shuffle Runtime Severity | 1m | 5m | 10m | 30m | avg(TaskAttempt.shuffleFinishTime - TaskAttempt.startTime) |
Shuffle/Code Runtime Ratio Severity | 1 | 2 | 4 | 8 | avgShuffleTime * 2 / avgCodeTime |
Sort Runtime Severity | 1m | 5m | 10m | 30m | avg(TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime) |
Sort/Code Runtime Ratio Severity | 1 | 2 | 4 | 8 | avgSortTime * 2 / avgCodeTime |
Spark
1. 采集作业详情
基于Spark HistoryServer rest api调用,前缀:http://{historyServer}:{port}
(1). 应用概要详情
代码语言:txt复制GET /api/v1/applications/{appId}
返回结果参数说明:
字段 | 说明 |
---|---|
attemptId | 应用执行ID,一个应用若失败,可重试进行多次执行 |
startTime | 应用执行的开始时间 |
endTime | 应用执行的结束时间 |
duration | 应用执行的耗时 |
sparkUser | 应用提交用户 |
completed | 应用是否执行完成 |
(2). 应用执行详情
基于应用执行信息获取最后一次执行(attempt),并调用接口获取该次执行的所有Job详情包括:Data、Stages、Executor、Conf信息。
(2.1). Job Datas
代码语言:txt复制GET /api/v1/applications/{appId}/{attemptId}/jobs
返回结果参数说明:
字段 | 说明 |
---|---|
jobId | 对应此次Attempt执行jobId |
stageIds | 对应的stageId信息 |
numTasks | 该job执行的Task任务数量 |
numCompletedTasks | 执行完成的Task数量 |
numCompletedStages | 执行完成的Stages数量 |
(2.2). Job Stages
代码语言:txt复制GET /api/v1/applications/{appId}/{attemptId}/stages
返回结果参数说明:
字段 | 说明 |
---|---|
attemptId | 执行的AttemptId |
stageId | 对应此次Attempt对应的StageId |
numTasks | 执行的Task数量 |
executorRunTime | 该Stage执行executor运行的时间(单位:ms) |
executorCpuTime | 该Stage执行executor时CPU耗时,包括数据拉取(单位:ms) |
submissionTime | 该Stage提交时间 |
firstTaskLaunchedTime | 该Stage第一个Task启动时间 |
inputBytes | 该Stage输入字节数 |
inputRecords | 该Stage输入记录数 |
outputBytes | 该Stage输出字节数 |
outputRecords | 该Stage输出记录数 |
shuffleReadBytes | Shuffle阶段读取字节数 |
shuffleReadRecords | Shuffle阶段读取记录数 |
shuffleWriteBytes | Shuffle节点写字节数 |
shuffleWriteRecords | Shuffle阶段写记录数 |
memoryBytesSpilled | 内存溢出字节数 |
diskBytesSpilled | 磁盘溢出字节数 |
details | 执行详情 |
schedulingPool | 调度资源池 |
rddIds | 该Stage对应RDD信息 |
(2.3). Job Executors
代码语言:txt复制GET /api/v1/applications/{appId}/{attemptId}/executors
结果返回参数说明:
字段 | 说明 |
---|---|
id | Executor执行ID |
hostPort | 指定执行Executor的Node节点信息 |
rddBlocks | Executor执行中持久化的RDD blocks数量 |
memoryUsed | Executor过程中RDD缓存内存的大小 |
diskUsed | Executor过程中RDD持久化到磁盘的空间大小 |
totalCores | Executor使用总核数 |
maxTasks | 最大的Task数量 |
totalDuration | Executor的总执行时间(单位:ms) |
maxMemory | 用于缓存RDD的最大内存(单位:bytes) |
executorLogs | Executor执行日志 |
memoryMetrics | 内存指标数据 |
totalGCTime | 总的GC耗时 |
(2.4). Job Conf
代码语言:txt复制GET /api/v1/applications/{appId}/{attemptId}/environment
2. Executor GC
指标说明:分析Spark任务中GC耗时情况,GC时间占比,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Total GC time | GC总耗时 | sum(Executor.totalGCTime) |
Total Executor Runtime | 总运行耗时 | sum(Executor.totalDuration) |
GC time to Executor Run time ratio | GC耗时与运行耗时占比 | sum(Executor.totalGCTime)/sum(Executor.totalDuration) |
指标建议:若Executor GC耗时占比过大,建议调大Excutor 内存值
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
GC Ratio High Severity | 0.08 | 0.1 | 0.15 | 0.2 | sum(Executor.totalGCTime)/sum(Executor.totalDuration) |
GC Ratio Low Severity | 0.05 | 0.04 | 0.03 | 0.01 | sum(Executor.totalGCTime)/sum(Executor.tota3Duration) |
3. Executor Metrics
指标说明:分析Spark任务Executor指标进行分析,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Total executor storage memory allocated | 所有Executors RDD缓存的内存总分配值 | sum(Executor.maxMemory) |
Total executor storage memory used | 所有Executors RDD缓存的内存总使用值 | sum(Executor.memoryUsed) |
Executor storage memory utilization rate | 所有Executors RDD缓存的内存比例 | sum(Executor.memoryUsed)/sum(Executor.maxMemory) |
Executor storage memory used distribution | 所有Executors RDD缓存的内存统计值 | Dist(Executor.memoryUsed) |
Executor task time distribution | 所有Executor 任务运行时间统计值 | Dist(Executor.totalDuration) |
Executor task time sum | 所有Executor 任务运行时间总值 | sum(Executor.totalDuration) |
Executor input bytes distribution | 所有Executor Input数据量统计值 | Dist(Executor.totalInputBytes) |
Executor shuffle read bytes distribution | 所有Executor Shuffle Read统计值 | Dist(Executor.totalShuffleRead) |
Executor shuffle write bytes distribution | 所有Executor Shuffle Write统计值 | Dist(Executor.totalShuffleWrite) |
其中Dist统计值计算包括:
min
:最小值max
:最大值p25
:百分位25%值median
:中位值(百分位50%)p75
:百分位75%值
指标建议:Executor 运行统计值判断任务是否运行合理
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
memoryUsed Median Ratio Severity | 0.125 | 0.25 | 0.5 | 1 | max(Executor.memoryUsed)/median(Executor.memoryUsed) |
Task Time Median Ratio Severity | 0.125 | 0.25 | 0.5 | 1 | max(Executor.totalDuration)/median(Executor.totalDuration) |
InputBytes Median Ratio Severity | 0.125 | 0.25 | 0.5 | 1 | max(Executor.totalInputBytes)/median(Executor.totalInputBytes) |
Shuffle Read Median Ratio Severity | 0.125 | 0.25 | 0.5 | 1 | max(Executor.totalShuffleRead)/median(Executor.totalShuffleRead) |
Shuffle Write Median Ratio Severity | 0.125 | 0.25 | 0.5 | 1 | max(Executor.totalShuffleWrite)/median(Executor.totalShuffleWrite) |
4. Stages Metrics
指标说明:分析Spark任务Stages指标进行分析,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Spark completed stages count | 已结束的Stages总计 | (Stage.status = COMPLETE).size |
Spark failed stages count | 失败的Stages总计 | (Stage.status = FAILED).size |
Spark stage failure rate | Stage执行失败率 | FailedStageSize/(FailedStageSize completedStageSize) |
Spark stages with high task failure rates | Stages中Tasks执行失败率 | (Stage.numFailedTasks/(Stage.numFailedTasks Stage.numCompletedTasks) |
Spark stages with long average executor runtime | Stages中Task超时运行的时间(30min) | 该Stage下,单个Executor的运行耗时:Stage.executorRunTime/Stage.Executors.size |
指标建议:Stages 运行统计值判断任务是否运行合理
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Max Task Failed Ratio Severity | 0.1 | 0.3 | 0.4 | 0.5 | max(Stage.numFailedTasks/(Stage.numFailedTasks Stage.numCompletedTasks) |
Max Task Runtime Severity | 15min | 30min | 45min | 60min | max(Stage.executorRunTime/Stage.Executors.size) |
5. Job Metrics
指标说明:分析Spark任务Jobs指标进行分析,分析的具体指标如下:
指标 | 说明 | 计算 |
---|---|---|
Spark completed jobs count | Spark任务执行成功的Job数量统计 | (Job.status = SUCCEEDED).size |
Spark failed jobs count | Spark任务执行失败的Job数量统计 | (Job.status = FAILED).size |
Spark failed jobs list | 执行失败的Job列表 | List(Job.status = FAILED) |
Spark jobs failure rates | Job执行失败率 | numFailedJobs/numFailedJobs numSucceededJobs |
Spark jobs with high task failure rates | Job Tasks执行失败率 | Job.numFailedTasks/Job.numTasks |
指标建议:若Executor GC耗时占比过大,建议调大Excutor 内存值
指标 | LOW | MODERATE | SEVERE | CRITICAL | 计算 |
---|---|---|---|---|---|
Max Task Failed Ratio Severity | 0.1 | 0.3 | 0.4 | 0.5 | max(Job.numFailedTasks/Job.numTasks) |
Max Job Failed Severity | 0.1 | 0.3 | 0.4 | 0.5 | max(numFailedJobs/numFailedJobs numSucceededJobs) |
6. Spark Conf
指标说明:对Spark配置参数进行分析,主要包括driver memory,driver cores,executor cores,executor instances,executor memory,serializer
指标 | 说明 | 计算 |
---|---|---|
Serializer Key | Spark序列化方式,是否支持KryoSerializer | 默认:org.apache.spark.serializer.KryoSerializer |
Shuffle Enable | 支持支持Shuffle | spark.shuffle.service.enabled |
Minimum Executors | 最小的Executor数量(1) | spark.dynamicAllocation.minExecutors |
Maximum Executors | 最大的Executor数量(900) | spark.dynamicAllocation.maxExecutors |
Jars notation | 指定对应jar,不使用*代指 | spark.yarn.secondary.jars |
Executor Overhead Memory | exector overhead memory配置 | spark.yarn.executor.memoryOverhead |
Driver Overhead Memory | driver overhead memory配置 | spark.yarn.driver.memoryOverhead |
通用聚合指标
计算运行App汇总指标,包括ResourceUsed 资源使用
,ResourceWasted 资源浪费
,TotalDelay 任务总延迟
。
ResourceUsed 资源使用
:与App.memorySeconds
保持一致,单位(MB-Seconds)ResourceWasted 资源浪费
:ResourceUsed - TaskMemoryUsed
,虽然配置了指定资源,但执行任务过程中会出现container 内存资源没有消耗完,即存在浪费内存;TotalDelay
:任务延迟分析,Task结束时间 - (Task提交时间 Task耗时)
内存引子=1.5,即实际的内存使用数据,可能超过内存已使用的指标数据,因此放大实际使用的倍数。
MapReduce
参数说明
- MapContainerMemorySize : 获取MapReduce中Map任务Container的Memory大小,通过配置参数mapreduce.map.memory.mb获取,若获取不到,默认2048MB;
- ReduceContainerMemorySize : 获取MapReduce中Reduce任务Container的Memory大小,通过配置参数mapreduce.reduce.memory.mb获取,若获取不到,默认2048MB;
指标 | 说明 | 计算 |
---|---|---|
ResourceUsed | 使用的总资源,单位MB-Seconds | App.memorySeconds |
ResourceWasted | 汇总所有任务Task的浪费内存资源,单位MB-Seconds | sum(((ContainerSize - max(TaskCounter.PHYSICAL_MEMORY_BYTES, TaskCounter.VIRTUAL_MEMORY_BYTES)*内存因子) * (Task.elapsedTime/Second) ) |
TotalDelay | 总任务延迟时间 | max(Task.finishTime) - (idealStartTime max(Task.elapsedTime)) |
Map任务中idealStartTime 为
Job.submitTime
,Reduce任务中 idealStartTime 为Map Task 中max(Task.finishTime);
Spark
参数说明:
- spark.executor.instances: Spark instances数量;
- spark.executor.memory:Spark任务中执行Executor的内存值;
指标 | 说明 | 计算 |
---|---|---|
ResourceUsed | 使用的总资源,单位MB-Seconds | App.memorySeconds |
ResourceWasted | 汇总所有任务Executor的浪费内存资源,单位MB-Seconds | sum((Excutor.maxMemory - Excutor.memoryUsed*内存因子)/MB * totalDuration/Second) |
TotalDelay | 总任务延迟时间 | sum(Stage.firstTaskLaunchedTime - Stage.submissionTime) |
总结
本文主要介绍资源管理组件中的计算资源优化,主要包括:配额管理、调度优先级设置、作业参数调优、监控与分析 四个方面。针对作业指标分析,基于开源项目 Dr. Elephant 进行介绍,分别详述了MapReduce任务和Spark任务的采集详情及作业的调优判断指标。基于 Dr. Elephant 项目的设计思路和启发式算法,我们可以进行二次开发或者重构一个作业智能诊断平台,从作业执行全链路进行参数调优。
我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!