现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。
今天我们将要学习的一个核心概念就是RDD。
RDD概念基础
RDD代表Resilient Distributed Dataset(弹性分不输计算数据集),它们是可以在多个节点上运行和操作的数据,从而能够实现高效并行计算的效果。RDD是不可变数据,这意味着一旦创建了RDD,就无法直接对其进行修改。此外,RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。
为了完成各种计算任务,RDD支持了多种的操作。这些对RDD的操作大致可以分为两种方式:
- 转换:将这种类型的操作应用于一个RDD后可以得到一个新的RDD,例如:Filter, groupBy, map等。
- 计算:将这种类型的操作应用于一个RDD后,它可以指示Spark执行计算并将计算结果返回。
为了在PySpark中执行相关操作,我们需要首先创建一个RDD对象。一个RDD对象的类定义如下:
代码语言:javascript复制class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
RDD实战
下面,我们以如下RDD对象为例,演示一些基础的PySpark操作。
代码语言:javascript复制words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
count()函数
count()函数返回RDD中元素的数量。
代码语言:javascript复制counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
# Number of elements in RDD -> 8
collect()函数
collect()函数将RDD中所有元素存入列表中并返回该列表。
代码语言:javascript复制coll = words.collect()
print "Elements in RDD -> %s" % (coll)
# Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
foreach(function)函数
foreach函数接收一个函数作为参数,将RDD中所有的元素作为参数调用传入的函数。 在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。
代码语言:javascript复制def function1(x):
"""
# 针对RDD中每个元素的函数
"""
print(x)
fore = words.foreach(function1)
filter(function)函数
filter函数传入一个过滤器函数,并将过滤器函数应用于原有RDD中的所有元素,并将满足过滤器条件的RDD元素存放至一个新的RDD对象中并返回。
代码语言:javascript复制words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
map(function)函数
map函数传入一个函数作为参数,并将该函数应用于原有RDD中的所有元素,将所有元素针对该函数的输出存放至一个新的RDD对象中并返回。
代码语言:javascript复制words_map = words.map(lambda x: (x, ))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
reduce(function)函数
reduce函数接收一些特殊的运算符,通过将原有RDD中的所有元素按照指定运算符进行计算,并返回计算结果。在下面的例子中,我们引入了一个加法运算符并将RDD中所有元素进行加法计算。
代码语言:javascript复制from operator import add
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
join(other, numPartitions=None)函数
join函数()对RDD对象中的Key进行匹配,将相同key中的元素合并在一起,并返回新的RDD对象。在下面的例子中,在两个RDD对象分别有两组元素,通过join函数,可以将这两个RDD对象进行合并,最终我们得到了一个合并对应key的value后的新的RDD对象。
代码语言:javascript复制x = sc.parallelize([("spark", ), ("hadoop", )])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
# Join RDD -> [
# ('spark', (1, 2)),
# ('hadoop', (4, 5))
# ]
cache()函数
cache()函数可以对RDD对象进行默认方式(memory)进行持久化。我们可以通过如下方式查询RDD对象是否被持久化了。
代码语言:javascript复制words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
# Words got cached -> True