大数据平台:计算资源优化技术&作业诊断

2024-05-03 18:37:38 浏览数 (1)

背景介绍

大数据平台的资源管理组件主要针对存储资源与计算资源进行分析优化。前文《大数据平台:资源管理及存储优化技术》主要介绍了存储资源优化,本文主要介绍大数据平台构建过程中,计算资源相关的优化技术。

优化技术

1. 配额管理

大数据平台作为SaaS(软件即服务)系统,会将应用开放给不同的使用方,属于多租户系统架构。多租户(Multi-tenancy)允许多个用户或用户组(称为“租户”)共享同一个系统或程序的实例,同时保持各自数据和配置的隔离性。具体的,大数据平台中,不同的租户是可以共享一套存储集群HDFS和计算资源YARN,但不同租户之间互不影响。

为保证多租户的资源隔离性,平台系统会提供多维度的配额管理,主要包括:

  • HDFS资源配额:管理每个租户(如项目)可使用的HDFS集群资源
  • YARN资源池分配:管理每个租户可使用的YARN计算资源
  • HBase配额:管理每个租户可使用HBase空间,例如Namespace数量,表数量,访问流量等

HDFS资源配额

基于CMD命令执行的常用HDFS配额操作如下:

(1). 设置HDFS指定目录配额Quota

代码语言:txt复制
cmd = "hdfs dfsadmin -setQuota {0} {1}".format(maxFileNum, directory)

(2). 清理分配Quota

代码语言:txt复制
cmd = "hdfs dfsadmin -clrQuota {0}".format(directory)

(3). 设置磁盘空间

代码语言:txt复制
cmd = "hdfs dfsadmin -setSpaceQuota {0} {1}".format(maxDiskSpace, directory)

(4). 清理磁盘空间

代码语言:txt复制
cmd = "hdfs dfsadmin -clrSpaceQuota {0}".format(directory)

YARN资源池分配

基于scheduler.xml加载更新YARN资源池

代码语言:txt复制
os.path.join(hadoop_conf_dir, 'fair-scheduler.xml') --更新配额文件
cmd = "yarn rmadmin -refreshQueues"

fair-scheduler.xml 公平调度示例XML文件:

代码语言:xml复制
<allocations>
  <queue name="root">
    <weight>0.0</weight>
    <queue name="default">
      <weight>0.56</weight>
    </queue>
    <queue name="project1">
      <maxResources>81920 mb, 40 vcores</maxResources>
      <weight>1.0</weight>
    </queue>
    <queue name="res_01">
      <maxResources>81920 mb, 40 vcores</maxResources>
      <weight>1.0</weight>
    </queue>
  </queue>
  <queueMaxAppsDefault>100</queueMaxAppsDefault>
  <defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>
</allocations>

HBase配额

HBase可基于配额管理实现针对Namespace和 Table 级别的的rpc请求的限制,限制读写次数和流量。其中,Namespace支持三种容量的管理:(1).table的最大数目,(2). region的最大数目,(3).namespace占用的文件系统空间。HBase限流设置的原理是在:在hbase:quota 进行元数据管理

设置Namespace的配额,需要hbase-site添加配置文件:

代码语言:xml复制
hbase.quota.enabled=true

设置限流的API可参考示例:TestQuotaTableUtil

Java API设置Namespace配额:

代码语言:java复制
connection = ConnectionFactory.createConnection(conf);
Admin admin = this.connection.getAdmin();
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("myns").build();
namespaceDescriptor.setConfiguration(
        "hbase.namespace.quota.maxtables", "10");
namespaceDescriptor.setConfiguration(
        "hbase.namespace.quota.maxregions", "100");
admin.createNamespace(namespaceDescriptor);
admin.close();

开启基于Region Server的多租户,需修改hbase-site.xml文件

代码语言:xml复制
<property>
   <name>hbase.coprocessor.master.classes</name>
   <value>org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint</value>
 </property>
 <property>
   <name>hbase.master.loadbalancer.class</name>
   <value>org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer</value>
 </property>

HBase shell配置设置文档:commands

(1). 基于set_quota 设置流量

代码语言:shell复制
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10req/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => READ, USER => 'u1', LIMIT => '10req/sec'

hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'

hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE

hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec'
hbase> set_quota TYPE => THROTTLE, TABLE => 't1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, TABLE => 't1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => NONE
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => NONE

(2). Namesapce设置Table数量控制

代码语言:xml复制
create_namespace 'ns1', {'hbase.namespace.quota.maxtables'=>'5'}
alter_namespace 'ns1', {METHOD => 'set', 'hbase.namespace.quota.maxtables'=>'8'}

(3). Namesapce设置Region数量控制

代码语言:txt复制
create_namespace 'ns1', {'hbase.namespace.quota.maxregions'=>'5'}
alter_namespace 'ns1', {METHOD => 'set', 'hbase.namespace.quota.maxregions'=>'8'}

2. 调度优先级

YARN调度器采用主资源公平调度算法(Dominant Resource Fairness,DRF),该算法扩展最大最小公平算法(max-min fairness),使其可以支持多维资源调度。DRF中将所需份额(比例)最大的资源称为主资源,将最大最小公平算法应用在主资源上,将多维资源调度问题转化为单资源调度问题。

资源调度器中,每个队列可以设置一个最小和最大资源量,在极端情况下,最小资源量是每个队列需要保证的资源量,最大资源量是不能超过的资源量。资源抢占发生的原因在于最小资源量的设置,资源调度器(包括Capacity Scheduler和Fair Scheduler) 会将负载较轻队列的资源暂时分配给负载较重的队列。特别的,最小资源量并不是硬资源保证,当队列负载低,也会暂时将空闲资源分配给其他有需求的队列。对于暂时分配出去的资源,在需要使用时会"告知"资源回收并等待一段时间,若超时则强制回收进行资源抢占。

YARN任务的优先级有支持两个维度

  • 全局最大优先级yarn.cluster.max-application-priority,设置全局默认最大优先级,系统将根据优先级从高到低调度
  • 队列默认优先级yarn.scheduler.capacity.{leaf-queue-path}.default-application-priority,设置指定队列默认优先级

YARN任务的优先级规则:

  • 设置优先级的数值越大,则调度优先级越高
  • 任务提交时,如果没有指定优先级,使用提交队列的队列默认优先级
  • 指定的优先级超过全局配置的优先级,则使用全局配置的优先级作为任务的优先级

3. 作业参数调优

作业参数调优是指在大数据运行作业(如MapReduce作业、Spark作业等)中,调整各种配置参数以优化作业的执行效率、减少资源消耗和提高系统的整体性能。常用作业参数调优:内存设置、并行度设置、I/O设置。参数调优可以分为事前、事中、事后不同形式:

  • 事前:任务执行前,了解数据读写模式,理解作业特性,识别作业瓶颈,进行定向调整。如CPU密集型,则增加算力资源与并行度。
  • 事中:计算引擎内置能力,如Spark支持AQE(Adaptive Query Execution) 进行动态的查询优化调整
  • 事后:基于UI工具、监控指标、日志等工具,找出性能瓶颈并调整

4. 监控与分析

监控与分析是指使用各种工具和技术来跟踪和评估大数据系统的性能和资源使用情况。目的是为了发现性能瓶颈、资源瓶颈、异常行为或者效率低下的地方,并基于这些信息进行优化。

常用的实现方式:

  1. 监控工具
    1. 内置监控工具:基于大数据系统自身内置的指标数据和监控工具,例如Spark History UI
    2. 外置监控工具:采集大数据系统的指标数据,使用Ganglia、Prometheus等系统级监控工具来收集集群的性能指标
  2. 日志分析:可以基于ELK进行可视化日志数据管理
  3. 调优工具:可以基于Dr. Elephant工具,自动分析作业指标并提供调优建议,下面将对作业指标分析进行详细展开。

作业指标诊断

Dr. Elephant 由 LinkedIn 于 2016 年开源,是一个 Hadoop 和 Spark 的性能监控和调优工具。通过自动化收集所有作业运行指标,进行数据分析并基于UI界面化方式展示。

整体架构如图所示,包括三部分:

  • 数据采集(Fetcher):自动采集执行成功的计算任务
  • 内置诊断(Rule):基于内置规则,启发式进行作业诊断
  • 存储及展示(DB):将分析结果保存在DB持久化,根据作业诊断分级在UI界面展示

MapReduce

1. 采集作业详情

基于JobHistory rest api调用,前缀:http://{historyServer}:{port}

(1). Job概要详情

代码语言:txt复制
GET /ws/v1/history/mapreduce/jobs/{jobId}

返回结果参数说明:

字段

说明

id

作业的JobID

submitTime

作业提交时间

startTime

作业启动时间

finishTime

作业结束时间

name

作业名称

queue

作用所属资源队列

user

作业提交用户

state

作业状态

mapsTotal

MapReduce作业Mapper阶段总个数

mapsCompleted

Mapper阶段总完成数

reducesTotal

MapReduce作业Reducer阶段总个数

reducesCompleted

Reducer阶段总完成数

uberized

是否启动uber mode,若开启(true),则MapReduce任务在同一个JVM上运行

diagnostics

诊断信息

avgMapTime

Mapper阶段平均耗时,单位毫秒(ms)

avgReduceTime

Reducer阶段平均耗时,单位毫秒(ms)

avgShuffleTime

数据Shuffle平均耗时,单位毫秒(ms)

avgMergeTime

数据Merge平均耗时,单位毫秒(ms)

failedMapAttempts

Mapper阶段失败尝试次数

killedMapAttempts

Mapper阶段被kill次数

successfulMapAttempts

Mapper阶段成功执行次数

failedReduceAttempts

Reducer阶段失败尝试次数

killedReduceAttempts

Reducer阶段被kill次数

successfulReduceAttempts

Reducer阶段成功执行次数

(2).Job CounterGroup

汇总作业运行的内置Counters信息(执行计数器),对MapReduce进行作业详情统计,counter主要包括:counter的Group类型,counter名称,counter总值,counter mapper数值,counter reducer数值。

代码语言:txt复制
GET /ws/v1/history/mapreduce/jobs/{jobId}/counters

返回结果参数说明:

FileSystemCounter级别

指标

描述

FILE_BYTES_READ

从本地文件系统读取的总字节数

FILE_BYTES_WRITTEN

向本地文件系统写入的总字节数

FILE_READ_OPS

本地文件系统的读操作次数

FILE_WRITE_OPS

本地文件系统的写操作次数

FILE_LARGE_READ_OPS

读取大文件的操作次数

HDFS_BYTES_READ

从HDFS读取的总字节数

HDFS_BYTES_WRITTEN

向HDFS写入的总字节数

HDFS_READ_OPS

HDFS的读操作次数

HDFS_WRITE_OPS

HDFS的写操作次数

HDFS_LARGE_READ_OPS

读取HDFS上大文件的操作次数

JobCounter级别

指标

描述

TOTAL_LAUNCHED_MAPS

启动的Map任务总数

TOTAL_LAUNCHED_REDUCES

启动的Reduce任务总数

DATA_LOCAL_MAPS

数据本地化的Map作业数

SLOTS_MILLIS_MAPS

所有Map任务在Slots的总耗时(单位:ms)

SLOTS_MILLIS_REDUCES

所有Reduce任务在Slots的总耗时(单位:ms)

MILLIS_MAPS

所有Map任务的总耗时(单位:ms)

MILLIS_REDUCES

所有Reduce任务的总耗时(单位:ms)

VCORES_MILLIS_MAPS

所有Map任务的总核数消耗(单位:vcore-ms)

VCORES_MILLIS_REDUCES

所有Reduce任务的总核数消耗(单位:vcore-ms)

MB_MILLIS_MAPS

所有Map任务的总内存消耗(单位:mb-ms)

MB_MILLIS_REDUCES

所有Reduce任务的总内存消耗(单位:mb-ms)

TaskCounter级别

指标

描述

MAP_INPUT_RECORDS

所有Map输入记录数

MAP_OUTPUT_RECORDS

Map任务产生的输出记录数

MAP_OUTPUT_BYTES

Map任务产生的输出字节数

MAP_OUTPUT_MATERIALIZED_BYTES

Map输出后写入到磁盘的字节数

SPLIT_RAW_BYTES

Mao读取的输入-分片对象的字节数

COMBINE_INPUT_RECORDS

Combiner处理的输入记录数

COMBINE_OUTPUT_RECORDS

Combiner产生的输出记录数

REDUCE_INPUT_GROUPS

Reduce处理的不同分组的个数

REDUCE_SHUFFLE_BYTES

Reduce任务通过Shuffle接收的字节数

REDUCE_INPUT_RECORDS

Reduce任务处理的输入记录数

REDUCE_OUTPUT_RECORDS

Reduce任务产生的输出记录数

SPILLED_RECORDS

作业中所有任务溢出到磁盘的记录数

SHUFFLED_MAPS

通过Shuffle从Map传输到Reduce的记录数

FAILED_SHUFFLE

失败的Shuffle操作数

MERGED_MAP_OUTPUTS

合并的Map输出数

GC_TIME_MILLIS

垃圾回收消耗的时间(单位:ms)

CPU_MILLISECONDS

总计的CPU时间(单位:ms)

PHYSICAL_MEMORY_BYTES

物理内存字节数

VIRTUAL_MEMORY_BYTES

虚拟内存字节数

COMMITTED_HEAP_BYTES

JVM中的总有效的堆内存量

(3).Job Tasks信息

获取作业下的所有task信息及对应的各个task的统计汇总信息和任务执行信息。

(3.1). Task列表

代码语言:txt复制
GET  /ws/v1/history/mapreduce/jobs/{jobID}/tasks

参数说明:

字段

说明

startTime

Task任务的启动时间

finishTime

Task任务的结束时间

elapsedTime

Task任务的耗时

id

Task任务ID

state

Task状态

type

Task类型:MAP、REDUCE

successfulAttempt

任务成功执行的Attempt信息

(3.2). Task Counters,获取指定task的counter统计信息

代码语言:txt复制
GET  /ws/v1/history/mapreduce/jobs/{jobID}/tasks/{taskId}/counters

(3.3). Task Attempts,获取指定task的attempts重试执行信息

代码语言:txt复制
GET  /ws/v1/history/mapreduce/jobs/{jobID}/tasks/{taskId}/attempts

参数说明:

字段

说明

startTime

Task任务的启动时间

finishTime

Task任务的结束时间

rack

Task任务执行机架信息

nodeHttpAddress

Task任务执行的node节点信息

assignedContainerId

Task任务被分配的containerId

2. Mapper/Reducer GC

指标说明:分别对Mapper、Reducer类型的Task任务进行分析,分析Task的GC效率,GC/CPU的使用比例、任务运行时间分布,分析指标:

指标

说明

计算

Number of tasks

对应Tasks的数量

Avg task runtime (ms)

对应Tasks的运行时间平均值

avg(TaskAttempt.finishTime - TaskAttempt.startTime)

Avg task CPU time (ms)

对应Tasks CPU耗时平均值, TaskCounter.CPU_MILLISECONDS 获取对应CPU耗时

avg(TaskCounter.CPU_MILLISECONDS)

Avg task GC time (ms)

对应Tasks GC耗时平均值, TaskCounter.GC_TIME_MILLIS 获取对应GC耗时

avg(TaskCounter.GC_TIME_MILLIS)

Task GC/CPU ratio

GC/CPU耗时比例,计算GC效率

avg(TaskCounter.GC_TIME_MILLIS)/avg(TaskCounter.CPU_MILLISECONDS)

指标建议:如果Task GC/CPU ratio 过高,则说明对应GC耗时比例过多,应该检查代码进行优化,减少GC耗时,Task运行时间过长,则说明该阶段Task任务过多,需重点关注,返回最严重的指标建议。

指标

LOW

MODERATE

SEVERE

CRITICAL

GC Ratio

0.01

0.02

0.03

0.04

Task Runtime(单位:分钟)

5

10

12

15

3. Mapper/Reducer Memory

指标说明:分别对Mapper、Reducer类型的Task任务进行分析,分析Task的内存使用率,分析指标如下:

指标

说明

计算

Number of tasks

对应Tasks的数量

Avg Physical Memory (MB)

对应Tasks消耗内存平均值,单位MB

avg(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB

Max Physical Memory (MB)

Tasks中最大的内存消耗,单位MB

max(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB

Min Physical Memory (MB)

Tasks中最小的内存消耗,单位MB

min(TaskCounter.PHYSICAL_MEMORY_BYTES)/MB

Avg Virtual Memory (MB)

对应Tasks消耗虚拟内存平均值,单位MB

avg(TaskCounter.VIRTUAL_MEMORY_BYTES)/MB

Requested Container Memory

配置参数中执行Map/Reduce任务container的配置额

mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,如果没有配置默认是2G

指标建议:根据分析Task内存使用情况,判断Container分配的内存是否合理,如果使用的内存远远小于container配额内存,则说明container内存配额过大;根据内存的统计值信息(平均值、最大值、最小值)判断是否有数据倾斜问题。

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Physical Memory Ratio

0.6

0.5

0.4

0.3

Physical Memor/Container Memory

Container配额

1.1

1.5

2.0

2.5

默认2G,对应倍数

4. Mapper/Reducer Skew

指标说明:分别对Mapper、Reducer类型的Task任务进行分析,统计分析任务的数据倾斜指标,将Tasks任务数据分为两组,其中一组(Group A)是小于平均值Tasks分析,另一组(Group B)是大于平均值Tasks分析,分析的具体指标如下:

指标

说明

计算

Data skew (Number of tasks)

对应Tasks的数量

Data skew (Group A)

获取Group A的Tasks数量和平均输入值,计算所有Tasks的数据量平均值,过滤Task 数据量小于平均值的为Group A

avg(GroupA(FileSystemCounter.DATA_BYTES))

Data skew (Group B)

获取Group B的Tasks数量和平均输入值,计算所有Tasks的数据量平均值,过滤Task 数据量大于平均值的为Group B

avg(GroupB(FileSystemCounter.DATA_BYTES))

Time skew (Number of tasks)

对应Tasks的数量

Time skew (Group A)

获取Group A的Tasks数量和平均运行耗时,计算所有Tasks的运行耗时平均值,过滤Task 运行耗时小于平均值的为Group A

avg(GroupA(TaskAttempt.finishTime - TaskAttempt.startTime))

Time skew (Group B)

获取Group B的Tasks数量和平均运行耗时,计算所有Tasks的运行耗时平均值,过滤Task 运行耗时大于平均值的为Group B

avg(GroupB(TaskAttempt.finishTime - TaskAttempt.startTime))

指标建议:若Group A和 Group B统计数值相差较大,则说明存在数据倾斜,对于Map任务应尽量减少过多小文件输入,对于Reduce任务尽量减少根据Key的聚合操作(如 GroupByKey、JoinKey)。

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Data Severity

2

4

8

16

(avgDataGroupB/avgDataGroupA) - 1

Time Severity

2

4

8

16

(avgTimeGroupB/avgTimeGroupA) - 1

Data GroupA TasksNum Severity

10

50

100

200

DataGroupA.size

Time GroupA TasksNum Severity

10

50

100

200

TimeGroupA.size

5. Mapper/Reducer Time

指标说明:分别对Mapper、Reducer类型的Task任务进行分析,对Task任务运行耗时进行统计分析,分析的具体指标如下:

指标

说明

计算

Number of tasks

对应Tasks的数量

Average task runtime

对应Tasks的运行耗时平均值

avg(TaskAttempt.finishTime - TaskAttempt.startTime)

Max task runtime

对应Tasks的运行耗时最大值

max(TaskAttempt.finishTime - TaskAttempt.startTime)

Min task runtime

对应Tasks的运行耗时最小值

min(TaskAttempt.finishTime - TaskAttempt.startTime)

Average task input size

Map Task中平均数据值(仅针对Map任务)

avg(FileSystemCounter.HDFS_BYTES_READ)

指标建议:根据Task数量与Task的运行耗时统计数据,判断Tasks数量是否合理,输入文件大小是否合理。

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Short Runtime Severity

10min

4min

2min

1min

avg(TaskAttempt.finishTime - TaskAttempt.startTime)

Long Runtime Severity

15min

30min

60min

120min

avg(TaskAttempt.finishTime - TaskAttempt.startTime)

Task Num Severity

50

100

500

1000

tasks.size

6. Mapper Speed

指标说明:对Mapper类型的Task任务进行分析,以指标反映Map人物的运行速率,分析的具体指标如下:

指标

说明

计算

Number of tasks

对应Tasks的数量

Median task input size

Map Task中Input数据量中位值

median(FileSystemCounter.HDFS_BYTES_READ)

Median task runtime

Tasks的运行耗时中位值

median(TaskAttempt.finishTime - TaskAttempt.startTime)

Median task speed

Tasks运行速率中位值

median((1000 * inputBytes) / (runtimeMs))

Total input size in MB

Map Task中Input总数据量

sum(FileSystemCounter.HDFS_BYTES_READ)

指标建议:分析Map任务的每秒运行效率,判断Map任务是否是CPU密集型,并进行代码优化

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Speed Severity

1/2 * 100M

1/4 * 100M

1/8 * 100M

1/32 * 100M

median((1000 * inputBytes) / (runtimeMs))

Runtime Severity

5min

10min

15min

30min

median(TaskAttempt.finishTime - TaskAttempt.startTime)

7. Mapper Spill

指标说明:对Mapper类型的Task任务进行分析,磁盘IO持久化性能,分析的具体指标如下:

指标

说明

计算

Number of tasks

对应Tasks的数量

Avg spilled records per task

每个Task的平均磁盘溢出记录数

sum(TaskCounter.SPILLED_RECORDS)/tasks.size

Avg output records per task

每个Task的平均Output记录数

sum(TaskCounter.MAP_OUTPUT_RECORDS)/tasks.size

Ratio of spilled records to output records

溢出记录数与输出记录数比值

sum(TaskCounter.SPILLED_RECORDS)/sum(TaskCounter.MAP_OUTPUT_RECORDS)

指标建议:分析Mapper任务的IO性能,是否存在IO溢出到磁盘,降低IO性能:

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Task Num Severity

50

100

500

1000

tasks.size

IO Spill Severity

2.01

2.2

2.5

3.0

sum(TaskCounter.SPILLED_RECORDS)/sum(TaskCounter.MAP_OUTPUT_RECORDS)

8. Shuffle Sort

指标说明:对Reducer类型的Task任务进行分析,在Shuffle/Sort阶段的耗时统计,分析的具体指标如下:

指标

说明

计算

Number of tasks

对应Tasks的数量

Average code runtime

真正Code执行时间的平均值

avg(TaskAttempt.finishTime -TaskAttempt.mergeFinishTime)

Average shuffle time

Shuffle时间的平均值,并计算与code runtime的比例(avgShuffleTime/avgCodeTime)

avg(TaskAttempt.shuffleFinishTime - TaskAttempt.startTime)

Average sort time

Sort时间的平均值,并计算与code runtime的比例(avgSortTime/avgCodeTime)

avg(TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime)

  • Reduce Code执行时间:totalTimeMs - shuffleTimeMs - sortTimeMs
  • totalTimeMs = TaskAttempt.finishTime - TaskAttempt.startTime
  • shuffleTimeMs = TaskAttempt.shuffleFinishTime - TaskAttempt.startTime
  • sortTimeMs = TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime

指标建议:根据对应Shuffle、Sort时间进行优化判断,对应的指标如下

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Shuffle Runtime Severity

1m

5m

10m

30m

avg(TaskAttempt.shuffleFinishTime - TaskAttempt.startTime)

Shuffle/Code Runtime Ratio Severity

1

2

4

8

avgShuffleTime * 2 / avgCodeTime

Sort Runtime Severity

1m

5m

10m

30m

avg(TaskAttempt.mergeFinishTime - TaskAttempt.shuffleFinishTime)

Sort/Code Runtime Ratio Severity

1

2

4

8

avgSortTime * 2 / avgCodeTime

Spark

1. 采集作业详情

基于Spark HistoryServer rest api调用,前缀:http://{historyServer}:{port}

(1). 应用概要详情

代码语言:txt复制
GET /api/v1/applications/{appId}

返回结果参数说明:

字段

说明

attemptId

应用执行ID,一个应用若失败,可重试进行多次执行

startTime

应用执行的开始时间

endTime

应用执行的结束时间

duration

应用执行的耗时

sparkUser

应用提交用户

completed

应用是否执行完成

(2). 应用执行详情

基于应用执行信息获取最后一次执行(attempt),并调用接口获取该次执行的所有Job详情包括:Data、Stages、Executor、Conf信息。

(2.1). Job Datas

代码语言:txt复制
GET /api/v1/applications/{appId}/{attemptId}/jobs

返回结果参数说明:

字段

说明

jobId

对应此次Attempt执行jobId

stageIds

对应的stageId信息

numTasks

该job执行的Task任务数量

numCompletedTasks

执行完成的Task数量

numCompletedStages

执行完成的Stages数量

(2.2). Job Stages

代码语言:txt复制
GET /api/v1/applications/{appId}/{attemptId}/stages

返回结果参数说明:

字段

说明

attemptId

执行的AttemptId

stageId

对应此次Attempt对应的StageId

numTasks

执行的Task数量

executorRunTime

该Stage执行executor运行的时间(单位:ms)

executorCpuTime

该Stage执行executor时CPU耗时,包括数据拉取(单位:ms)

submissionTime

该Stage提交时间

firstTaskLaunchedTime

该Stage第一个Task启动时间

inputBytes

该Stage输入字节数

inputRecords

该Stage输入记录数

outputBytes

该Stage输出字节数

outputRecords

该Stage输出记录数

shuffleReadBytes

Shuffle阶段读取字节数

shuffleReadRecords

Shuffle阶段读取记录数

shuffleWriteBytes

Shuffle节点写字节数

shuffleWriteRecords

Shuffle阶段写记录数

memoryBytesSpilled

内存溢出字节数

diskBytesSpilled

磁盘溢出字节数

details

执行详情

schedulingPool

调度资源池

rddIds

该Stage对应RDD信息

(2.3). Job Executors

代码语言:txt复制
GET /api/v1/applications/{appId}/{attemptId}/executors

结果返回参数说明:

字段

说明

id

Executor执行ID

hostPort

指定执行Executor的Node节点信息

rddBlocks

Executor执行中持久化的RDD blocks数量

memoryUsed

Executor过程中RDD缓存内存的大小

diskUsed

Executor过程中RDD持久化到磁盘的空间大小

totalCores

Executor使用总核数

maxTasks

最大的Task数量

totalDuration

Executor的总执行时间(单位:ms)

maxMemory

用于缓存RDD的最大内存(单位:bytes)

executorLogs

Executor执行日志

memoryMetrics

内存指标数据

totalGCTime

总的GC耗时

(2.4). Job Conf

代码语言:txt复制
GET /api/v1/applications/{appId}/{attemptId}/environment

2. Executor GC

指标说明:分析Spark任务中GC耗时情况,GC时间占比,分析的具体指标如下:

指标

说明

计算

Total GC time

GC总耗时

sum(Executor.totalGCTime)

Total Executor Runtime

总运行耗时

sum(Executor.totalDuration)

GC time to Executor Run time ratio

GC耗时与运行耗时占比

sum(Executor.totalGCTime)/sum(Executor.totalDuration)

指标建议:若Executor GC耗时占比过大,建议调大Excutor 内存值

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

GC Ratio High Severity

0.08

0.1

0.15

0.2

sum(Executor.totalGCTime)/sum(Executor.totalDuration)

GC Ratio Low Severity

0.05

0.04

0.03

0.01

sum(Executor.totalGCTime)/sum(Executor.tota3Duration)

3. Executor Metrics

指标说明:分析Spark任务Executor指标进行分析,分析的具体指标如下:

指标

说明

计算

Total executor storage memory allocated

所有Executors RDD缓存的内存总分配值

sum(Executor.maxMemory)

Total executor storage memory used

所有Executors RDD缓存的内存总使用值

sum(Executor.memoryUsed)

Executor storage memory utilization rate

所有Executors RDD缓存的内存比例

sum(Executor.memoryUsed)/sum(Executor.maxMemory)

Executor storage memory used distribution

所有Executors RDD缓存的内存统计值

Dist(Executor.memoryUsed)

Executor task time distribution

所有Executor 任务运行时间统计值

Dist(Executor.totalDuration)

Executor task time sum

所有Executor 任务运行时间总值

sum(Executor.totalDuration)

Executor input bytes distribution

所有Executor Input数据量统计值

Dist(Executor.totalInputBytes)

Executor shuffle read bytes distribution

所有Executor Shuffle Read统计值

Dist(Executor.totalShuffleRead)

Executor shuffle write bytes distribution

所有Executor Shuffle Write统计值

Dist(Executor.totalShuffleWrite)

其中Dist统计值计算包括:

  • min:最小值
  • max:最大值
  • p25:百分位25%值
  • median:中位值(百分位50%)
  • p75:百分位75%值

指标建议:Executor 运行统计值判断任务是否运行合理

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

memoryUsed Median Ratio Severity

0.125

0.25

0.5

1

max(Executor.memoryUsed)/median(Executor.memoryUsed)

Task Time Median Ratio Severity

0.125

0.25

0.5

1

max(Executor.totalDuration)/median(Executor.totalDuration)

InputBytes Median Ratio Severity

0.125

0.25

0.5

1

max(Executor.totalInputBytes)/median(Executor.totalInputBytes)

Shuffle Read Median Ratio Severity

0.125

0.25

0.5

1

max(Executor.totalShuffleRead)/median(Executor.totalShuffleRead)

Shuffle Write Median Ratio Severity

0.125

0.25

0.5

1

max(Executor.totalShuffleWrite)/median(Executor.totalShuffleWrite)

4. Stages Metrics

指标说明:分析Spark任务Stages指标进行分析,分析的具体指标如下:

指标

说明

计算

Spark completed stages count

已结束的Stages总计

(Stage.status = COMPLETE).size

Spark failed stages count

失败的Stages总计

(Stage.status = FAILED).size

Spark stage failure rate

Stage执行失败率

FailedStageSize/(FailedStageSize completedStageSize)

Spark stages with high task failure rates

Stages中Tasks执行失败率

(Stage.numFailedTasks/(Stage.numFailedTasks Stage.numCompletedTasks)

Spark stages with long average executor runtime

Stages中Task超时运行的时间(30min)

该Stage下,单个Executor的运行耗时:Stage.executorRunTime/Stage.Executors.size

指标建议:Stages 运行统计值判断任务是否运行合理

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Max Task Failed Ratio Severity

0.1

0.3

0.4

0.5

max(Stage.numFailedTasks/(Stage.numFailedTasks Stage.numCompletedTasks)

Max Task Runtime Severity

15min

30min

45min

60min

max(Stage.executorRunTime/Stage.Executors.size)

5. Job Metrics

指标说明:分析Spark任务Jobs指标进行分析,分析的具体指标如下:

指标

说明

计算

Spark completed jobs count

Spark任务执行成功的Job数量统计

(Job.status = SUCCEEDED).size

Spark failed jobs count

Spark任务执行失败的Job数量统计

(Job.status = FAILED).size

Spark failed jobs list

执行失败的Job列表

List(Job.status = FAILED)

Spark jobs failure rates

Job执行失败率

numFailedJobs/numFailedJobs numSucceededJobs

Spark jobs with high task failure rates

Job Tasks执行失败率

Job.numFailedTasks/Job.numTasks

指标建议:若Executor GC耗时占比过大,建议调大Excutor 内存值

指标

LOW

MODERATE

SEVERE

CRITICAL

计算

Max Task Failed Ratio Severity

0.1

0.3

0.4

0.5

max(Job.numFailedTasks/Job.numTasks)

Max Job Failed Severity

0.1

0.3

0.4

0.5

max(numFailedJobs/numFailedJobs numSucceededJobs)

6. Spark Conf

指标说明:对Spark配置参数进行分析,主要包括driver memory,driver cores,executor cores,executor instances,executor memory,serializer

指标

说明

计算

Serializer Key

Spark序列化方式,是否支持KryoSerializer

默认:org.apache.spark.serializer.KryoSerializer

Shuffle Enable

支持支持Shuffle

spark.shuffle.service.enabled

Minimum Executors

最小的Executor数量(1)

spark.dynamicAllocation.minExecutors

Maximum Executors

最大的Executor数量(900)

spark.dynamicAllocation.maxExecutors

Jars notation

指定对应jar,不使用*代指

spark.yarn.secondary.jars

Executor Overhead Memory

exector overhead memory配置

spark.yarn.executor.memoryOverhead

Driver Overhead Memory

driver overhead memory配置

spark.yarn.driver.memoryOverhead

通用聚合指标

计算运行App汇总指标,包括ResourceUsed 资源使用ResourceWasted 资源浪费TotalDelay 任务总延迟

  • ResourceUsed 资源使用:与App.memorySeconds保持一致,单位(MB-Seconds)
  • ResourceWasted 资源浪费ResourceUsed - TaskMemoryUsed,虽然配置了指定资源,但执行任务过程中会出现container 内存资源没有消耗完,即存在浪费内存;
  • TotalDelay:任务延迟分析,Task结束时间 - (Task提交时间 Task耗时)

内存引子=1.5,即实际的内存使用数据,可能超过内存已使用的指标数据,因此放大实际使用的倍数。

MapReduce

参数说明

  • MapContainerMemorySize : 获取MapReduce中Map任务Container的Memory大小,通过配置参数mapreduce.map.memory.mb获取,若获取不到,默认2048MB;
  • ReduceContainerMemorySize : 获取MapReduce中Reduce任务Container的Memory大小,通过配置参数mapreduce.reduce.memory.mb获取,若获取不到,默认2048MB;

指标

说明

计算

ResourceUsed

使用的总资源,单位MB-Seconds

App.memorySeconds

ResourceWasted

汇总所有任务Task的浪费内存资源,单位MB-Seconds

sum(((ContainerSize - max(TaskCounter.PHYSICAL_MEMORY_BYTES, TaskCounter.VIRTUAL_MEMORY_BYTES)*内存因子) * (Task.elapsedTime/Second) )

TotalDelay

总任务延迟时间

max(Task.finishTime) - (idealStartTime max(Task.elapsedTime))

Map任务中idealStartTime 为 Job.submitTime,Reduce任务中 idealStartTime 为Map Task 中max(Task.finishTime);

Spark

参数说明:

  • spark.executor.instances: Spark instances数量;
  • spark.executor.memory:Spark任务中执行Executor的内存值;

指标

说明

计算

ResourceUsed

使用的总资源,单位MB-Seconds

App.memorySeconds

ResourceWasted

汇总所有任务Executor的浪费内存资源,单位MB-Seconds

sum((Excutor.maxMemory - Excutor.memoryUsed*内存因子)/MB * totalDuration/Second)

TotalDelay

总任务延迟时间

sum(Stage.firstTaskLaunchedTime - Stage.submissionTime)

总结

本文主要介绍资源管理组件中的计算资源优化,主要包括:配额管理、调度优先级设置、作业参数调优、监控与分析 四个方面。针对作业指标分析,基于开源项目 Dr. Elephant 进行介绍,分别详述了MapReduce任务和Spark任务的采集详情及作业的调优判断指标。基于 Dr. Elephant 项目的设计思路和启发式算法,我们可以进行二次开发或者重构一个作业智能诊断平台,从作业执行全链路进行参数调优。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

0 人点赞