spark在处理的数据在内部是分partition的。
除非是在本地新建的list数组才需要使用parallelize。保存在hdfs中的文件,在使用spark处理的时候是默认分partition的。
我们可以使用getNumPartitions()获取当前rdd的partition的信息。
通过glom()函数能够获取到分partition的rdd信息
我们在处理数据的一般使用的map函数,同样也可以根据partition进行mapPartition处理,但是需要注意的是map处理的是每一行的数据,是item。而mapPartition是处理的是一个partition上的数据,所以它处理的是iterator。
coalesce我们可以用这个函数进行reduce操作,缩减分区数,注意是缩减分区数,不能增加分区数。
repartition 我们可以使用这个函数进行重新分区,指定我们想要的分区数,设置的分区数可以大于当前rdd的分区数,也可以小于当前rdd的分区数。
在coalesce和repartition的选择上遵循这样的原则,如果是减少分区数,优先使用coalesce,如果是增加分区数,使用repartition。
下面第一个例子:
代码语言:javascript复制#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, SparkConf
def main():
logFile = "/user/root/spark/sparkstudy03.txt"
master = 'yarn-client'
appName = 'Simple App spark study03'
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
logData = sc.textFile(logFile)
logData.cache()
print("partitions is :%s" % logData.getNumPartitions())
logList = logData.glom().collect()
print("glom result is :%s" % logList)
logDataPartition4 = logData.repartition(4)
print("repartition num :%s", logDataPartition4.getNumPartitions())
print("repartition result :%s", logDataPartition4.glom().collect())
logListCoalesce = logDataPartition4.coalesce(3, True).glom().collect()
print("coalesce shuffle true result is :%s" % logListCoalesce)
logListCoalesce2 = logDataPartition4.coalesce(3, False).glom().collect()
print("coalesce shuffle false result is :%s" % logListCoalesce2)
logData.unpersist()
if __name__ == '__main__':
main()
原始数据
代码语言:javascript复制1
2
3
4
5
6
7
运行结果:
代码语言:javascript复制partitions is :2
glom result is :[[u'1', u'2', u'3', u'4'], [u'5', u'6', u'7']]
('repartition num :%s', 4)
('repartition result :%s', [[u'2', u'6'], [u'3', u'7'], [u'4'], [u'1', u'5']])
coalesce shuffle true result is :[[u'1'], [u'2', u'3', u'5'], [u'6', u'7', u'4']]
coalesce shuffle false result is :[[u'2', u'6'], [u'3', u'7'], [u'4', u'1', u'5']]