报错如下
原因分析
shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。
shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。
解决思路
1、减少shuffle数据
主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
2、修改分区
通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。
3、增加失败的重试次数和重试的时间间隔
通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。
通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。
4、提高executor的内存
在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。
5、考虑是否存在数据倾斜的问题
总结
1、org.apache.spark.shuffle.FetchFailedException
1、问题描述
这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时。
2、报错提示
(1) missing output location
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
(2) shuffle fetch faild
org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
当前的配置为每个executor使用1cpu,5GRAM,启动了20个executor
3、解决方案
一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。
spark.executor.memory 15G
spark.executor.cores 3
spark.cores.max 21
启动的execuote数量为:7个
execuoteNum = spark.cores.max/spark.executor.cores
每个executor的配置:
3core,15G RAM
消耗的内存资源为:105G RAM
15G*7=105G
可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就结束了。
2、Executor&Task Lost
1、问题描述
因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈
2、报错提示
executor lost
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
task lost
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
各种timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second
ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong
3、解决方案
提高 spark.network.timeout 的值,根据情况改成300(5min)或更高。
默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属
spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout or spark.rpc.lookupTimeout
3、倾斜
1、问题描述
大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢。
分为数据倾斜和task倾斜两种。
2、错误提示
数据倾斜
任务倾斜
差距不大的几个task,有的运行速度特别慢。
3、解决方案
数据倾斜:数据倾斜大多数情况是由于大量null值或者""引起,在计算前过滤掉这些数据既可。
例如:sqlContext.sql("...where col is not null and col != ''")
任务倾斜:task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。
或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。
spark.speculation true
spark.speculation.interval 100 - 检测周期,单位毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比时启动推测
spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。