第3天:核心概念之RDD

2021-03-16 10:23:51 浏览数 (1)

现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。
今天我们将要学习的一个核心概念就是RDD。

RDD概念基础

RDD代表Resilient Distributed Dataset(弹性分不输计算数据集),它们是可以在多个节点上运行和操作的数据,从而能够实现高效并行计算的效果。RDD是不可变数据,这意味着一旦创建了RDD,就无法直接对其进行修改。此外,RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。

为了完成各种计算任务,RDD支持了多种的操作。这些对RDD的操作大致可以分为两种方式:

  1. 转换:将这种类型的操作应用于一个RDD后可以得到一个新的RDD,例如:Filter, groupBy, map等。
  2. 计算:将这种类型的操作应用于一个RDD后,它可以指示Spark执行计算并将计算结果返回。

为了在PySpark中执行相关操作,我们需要首先创建一个RDD对象。一个RDD对象的类定义如下:

代码语言:javascript复制
class pyspark.RDD (
 
   jrdd, 
 
   ctx, 
 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
 
)
 

RDD实战

下面,我们以如下RDD对象为例,演示一些基础的PySpark操作。

代码语言:javascript复制
words = sc.parallelize (
 
 ["scala", 
 
 "java", 
 
 "hadoop", 
 
 "spark", 
 
 "akka",
 
 "spark vs hadoop", 
 
 "pyspark",
 
 "pyspark and spark"]
 
)
 

count()函数

count()函数返回RDD中元素的数量。

代码语言:javascript复制
counts = words.count()
 
print "Number of elements in RDD -> %i" % (counts)
 
# Number of elements in RDD -> 8
 

collect()函数

collect()函数将RDD中所有元素存入列表中并返回该列表。

代码语言:javascript复制
coll = words.collect()
 
print "Elements in RDD -> %s" % (coll)
 
# Elements in RDD -> [ 'scala',  'java',  'hadoop',  'spark',  'akka',  'spark vs hadoop',  'pyspark',  'pyspark and spark' ]
 

foreach(function)函数

foreach函数接收一个函数作为参数,将RDD中所有的元素作为参数调用传入的函数。 在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。

代码语言:javascript复制
def function1(x):
 
 """
 
    # 针对RDD中每个元素的函数
 
    """
 
 print(x)
 
fore = words.foreach(function1) 
 

filter(function)函数

filter函数传入一个过滤器函数,并将过滤器函数应用于原有RDD中的所有元素,并将满足过滤器条件的RDD元素存放至一个新的RDD对象中并返回。

代码语言:javascript复制
words_filter = words.filter(lambda x: 'spark' in x)
 
filtered = words_filter.collect()
 
print "Fitered RDD -> %s" % (filtered)
 

map(function)函数

map函数传入一个函数作为参数,并将该函数应用于原有RDD中的所有元素,将所有元素针对该函数的输出存放至一个新的RDD对象中并返回。

代码语言:javascript复制
words_map = words.map(lambda x: (x, ))
 
mapping = words_map.collect()
 
print "Key value pair -> %s" % (mapping)
 

reduce(function)函数

reduce函数接收一些特殊的运算符,通过将原有RDD中的所有元素按照指定运算符进行计算,并返回计算结果。在下面的例子中,我们引入了一个加法运算符并将RDD中所有元素进行加法计算。

代码语言:javascript复制
from operator import add
 
nums = sc.parallelize([1, 2, 3, 4, 5])
 
adding = nums.reduce(add)
 
print "Adding all the elements -> %i" % (adding)
 

join(other, numPartitions=None)函数

join函数()对RDD对象中的Key进行匹配,将相同key中的元素合并在一起,并返回新的RDD对象。在下面的例子中,在两个RDD对象分别有两组元素,通过join函数,可以将这两个RDD对象进行合并,最终我们得到了一个合并对应key的value后的新的RDD对象。

代码语言:javascript复制
x = sc.parallelize([("spark", ), ("hadoop", )])
 
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
 
joined = x.join(y)
 
final = joined.collect()
 
print "Join RDD -> %s" % (final)
 
# Join RDD -> [
 
#   ('spark', (1, 2)),
 
#   ('hadoop', (4, 5))
 
# ]
 

cache()函数

cache()函数可以对RDD对象进行默认方式(memory)进行持久化。我们可以通过如下方式查询RDD对象是否被持久化了。

代码语言:javascript复制
words.cache() 
 
caching = words.persist().is_cached
 
print "Words got chached > %s" % (caching)
 
# Words got cached -> True
 

0 人点赞