用PySpark开发时的调优思路(上)

2021-06-25 16:39:09 浏览数 (1)

这一小节的内容算是对pyspark入门的一个ending了,全文主要是参考学习了美团Spark性能优化指南的基础篇和高级篇内容,主体脉络和这两篇文章是一样的,只不过是基于自己学习后的理解进行了一次总结复盘,而原文中主要是用Java来举例的,我这边主要用pyspark来举例。文章主要会从4个方面(或者说4个思路)来优化我们的Spark任务,主要就是下面的图片所示:(本小节只写了开发习惯调优哈)

1. 开发习惯调优
1)尽可能复用同一个RDD,避免重复创建,并且适当持久化数据

这种开发习惯是需要我们对于即将要开发的应用逻辑有比较深刻的思考,并且可以通过code review来发现的,讲白了就是要记得我们创建过啥数据集,可以复用的尽量广播(broadcast)下,能很好提升性能。

代码语言:javascript复制
# 最低级写法,相同数据集重复创建。
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
rdd2 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
print(rdd1.take(10))
print(rdd2.map(lambda x:x[0:1]).take(10))

# 稍微进阶一些,复用相同数据集,但因中间结果没有缓存,数据会重复计算
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
print(rdd1.take(10))
print(rdd1.map(lambda x:x[0:1]).take(10))

# 相对比较高效,使用缓存来持久化数据
rdd = sc.parallelize(range(1, 11), 4).cache()  # 或者persist()
rdd_map = rdd.map(lambda x: x*2)
rdd_reduce = rdd.reduce(lambda x, y: x y)
print(rdd_map.take(10))
print(rdd_reduce)

下面我们就来对比一下使用缓存能给我们的Spark程序带来多大的效率提升吧,我们先构造一个程序运行时长测量器。

代码语言:javascript复制
import time
# 统计程序运行时间
def time_me(info="used"):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.time()
            fn(*args, **kwargs)
            print("%s %s %s" % (fn.__name__, info, time.time() - start), "second")
        return _wrapper
    return _time_me

下面我们运行下面的代码,看下使用了cache带来的效率提升:

代码语言:javascript复制
@time_me()
def test(types=0):
    if types == 1:
        print("使用持久化缓存")
        rdd = sc.parallelize(range(1, 10000000), 4)
        rdd1 = rdd.map(lambda x: x*x   2*x   1).cache()  # 或者 persist(StorageLevel.MEMORY_AND_DISK_SER)
        print(rdd1.take(10))
        rdd2 = rdd1.reduce(lambda x, y: x y)
        rdd3 = rdd1.reduce(lambda x, y: x   y)
        rdd4 = rdd1.reduce(lambda x, y: x   y)
        rdd5 = rdd1.reduce(lambda x, y: x   y)
        print(rdd5)
    else:
        print("不使用持久化缓存")
        rdd = sc.parallelize(range(1, 10000000), 4)
        rdd1 = rdd.map(lambda x: x * x   2 * x   1)
        print(rdd1.take(10))
        rdd2 = rdd1.reduce(lambda x, y: x   y)
        rdd3 = rdd1.reduce(lambda x, y: x   y)
        rdd4 = rdd1.reduce(lambda x, y: x   y)
        rdd5 = rdd1.reduce(lambda x, y: x   y)
        print(rdd5)

        
test()   # 不使用持久化缓存
time.sleep(10)
test(1)  # 使用持久化缓存
# output:
# 使用持久化缓存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 26.36529278755188 second
# 使用持久化缓存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 17.49532413482666 second

同时我们打开YARN日志来看看:http://localhost:4040/jobs/

因为我们的代码是需要重复调用RDD1的,当没有对RDD1进行持久化的时候,每次当它被action算子消费了之后,就释放了,等下一个算子计算的时候要用,就从头开始计算一下RDD1。代码中需要重复调用RDD1 五次,所以没有缓存的话,差不多每次都要6秒,总共需要耗时26秒左右,但是,做了缓存,每次就只需要3s不到,总共需要耗时17秒左右。

另外,这里需要提及一下一个知识点,那就是持久化的级别,一般cache的话就是放入内存中,就没有什么好说的,需要讲一下的就是另外一个 persist(),它的持久化级别是可以被我们所配置的:

持久化级别

含义解释

MEMORY_ONLY

将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。使用cache()方法时,实际就是使用的这种持久化策略,性能也是最高的。

MEMORY_AND_DISK

优先尝试将数据保存在内存中,如果内存不够存放所有的数据,会将数据写入磁盘文件中。

MEMORY_ONLY_SER

基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

MEMORY_AND_DISK_SER

基本含义同MEMORY_AND_DISK。唯一的区别是会先序列化,节约内存。

DISK_ONLY

使用未序列化的Java对象格式,将数据全部写入磁盘文件中。一般不推荐使用。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。一般也不推荐使用。

2)尽量避免使用低性能算子

shuffle类算子算是低性能算子的一种代表,所谓的shuffle类算子,指的是会产生shuffle过程的操作,就是需要把各个节点上的相同key写入到本地磁盘文件中,然后其他的节点通过网络传输拉取自己需要的key,把相同key拉到同一个节点上进行聚合计算,这种操作必然就是有大量的数据网络传输与磁盘读写操作,性能往往不是很好的。

那么,Spark中有哪些算子会产生shuffle过程呢?

操作类别

shuffle类算子

备注

分区操作

repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true)

重分区操作一般都会shuffle,因为需要对所有的分区数据进行打乱。

聚合操作

reduceByKey、groupByKey、sortByKey

需要对相同key进行操作,所以需要拉到同一个节点上。

关联操作

join类操作

需要把相同key的数据shuffle到同一个节点然后进行笛卡尔积

去重操作

distinct等

需要对相同key进行操作,所以需要shuffle到同一个节点上。

排序操作

sortByKey等

需要对相同key进行操作,所以需要shuffle到同一个节点上。

这里进一步介绍一个替代join的方案,因为join其实在业务中还是蛮常见的。

代码语言:javascript复制
# 原则2:尽量避免使用低性能算子
rdd1 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])
rdd2 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
# 低效的写法,也是传统的写法,直接join
rdd_join = rdd1.join(rdd2)
print(rdd_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_left_join = rdd1.leftOuterJoin(rdd2)
print(rdd_left_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_full_join = rdd1.fullOuterJoin(rdd2)
print(rdd_full_join.collect())
# [('A4', (24, 14)), ('A3', (None, 13)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]

# 高效的写法,使用广播 map来实现相同效果
# tips1: 这里需要注意的是,用来broadcast的RDD不可以太大,最好不要超过1G
# tips2: 这里需要注意的是,用来broadcast的RDD不可以有重复的key的
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd2 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])

# step1: 先将小表进行广播,也就是collect到driver端,然后广播到每个Executor中去。
rdd_small_bc = sc.broadcast(rdd1.collect())

# step2:从Executor中获取存入字典便于后续map操作
rdd_small_dict = dict(rdd_small_bc.value)

# step3:定义join方法
def broadcast_join(line, rdd_small_dict, join_type):
    k = line[0]
    v = line[1]
    small_table_v = rdd_small_dict[k] if k in rdd_small_dict else None
    if join_type == 'join':
        return (k, (v, small_table_v)) if k in rdd_small_dict else None
    elif join_type == 'left_join':
        return (k, (v, small_table_v if small_table_v is not None else None))
    else:
        print("not support join type!")

# step4:使用 map 实现 两个表join的功能
rdd_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "join")).filter(lambda line: line is not None)
rdd_left_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "left_join")).filter(lambda line: line is not None)
print(rdd_join.collect())
print(rdd_left_join.collect())
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14))]
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14)), ('A5', (25, None))]

上面的RDD join被改写为 broadcast map的PySpark版本实现,不过里面有两个点需要注意:

  • tips1: 用来broadcast的RDD不可以太大,最好不要超过1G
  • tips2: 用来broadcast的RDD不可以有重复的key
3)尽量使用高性能算子

上一节讲到了低效算法,自然地就会有一些高效的算子。

原算子

高效算子(替换算子)

说明

map

mapPartitions

直接map的话,每次只会处理一条数据,而mapPartitions则是每次处理一个分区的数据,在某些场景下相对比较高效。(分区数据量不大的情况下使用,如果有数据倾斜的话容易发生OOM)

groupByKey

reduceByKey/aggregateByKey

这类算子会在原节点先map-side预聚合,相对高效些。

foreach

foreachPartitions

同第一条记录一样。

filter

filter coalesce

当我们对数据进行filter之后,有很多partition的数据会剧减,然后直接进行下一步操作的话,可能就partition数量很多但处理的数据又很少,task数量没有减少,反而整体速度很慢;但如果执行了coalesce算子,就会减少一些partition数量,把数据都相对压缩到一起,用更少的task处理完全部数据,一定场景下还是可以达到整体性能的提升。

repartition sort

repartitionAndSortWithinPartitions

直接用就是了。

4)广播大变量

如果我们有一个数据集很大,并且在后续的算子执行中会被反复调用,那么就建议直接把它广播(broadcast)一下。当变量被广播后,会保证每个executor的内存中只会保留一份副本,同个executor内的task都可以共享这个副本数据。如果没有广播,常规过程就是把大变量进行网络传输到每一个相关task中去,这样子做,一来频繁的网络数据传输,效率极其低下;二来executor下的task不断存储同一份大数据,很有可能就造成了内存溢出或者频繁GC,效率也是极其低下的。

代码语言:javascript复制
# 原则4:广播大变量
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd1_broadcast = sc.broadcast(rdd1.collect())
print(rdd1.collect())
print(rdd1_broadcast.value)
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]

?学习资源推荐:

1)《Spark性能优化指南——基础篇》

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

2)《Spark性能优化指南——高级篇》

https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

0 人点赞