前言
在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。
在 PySpark 中,所有的数据计算都是基于 RDD(弹性分布式数据集)对象进行的。RDD 提供了丰富的成员方法(算子)来执行各种数据处理操作。
一、map算子
定义:
map
算子会对RDD中的每个元素应用一个用户定义的函数,并返回一个新的 RDD。
语法:
new_rdd = rdd.map(func)
参数func
为一个函数,该函数接受单个输入参数,并返回一个输出值,其函数表示法为f:(T) → U
- f:表示这是一个函数(方法)
- T:表示传入参数的类型,可以是任意类型
- U:表示返回值的类型,可以是任意类型
- (T)-U:表示该方法接受一个参数(类型为 T),返回值的类型为 U
import os
from pyspark import SparkConf, SparkContext
# os.environ['PYSPARK_PYTHON'] =“自己电脑Python.exe的安装路径”,用于指定Python解释器
os.environ['PYSPARK_PYTHON'] = "D:StudyPaythonpython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
return data * 10
print(rdd2.collect())
输出结果: 10,20,30,40,50
【分析】
rdd.map(func) 创建一个新的RDD对象rdd2,其中每个元素都会通过map
算子应用函数 func
。因此,原始 RDD 中的每个元素(1, 2, 3, 4, 5)都会依次被传入 func
函数并处理:
func(1) 产生 10
func(2) 产生 20
func(3) 产生 30
func(4) 产生 40
func(5) 产生 50
结果是新的RD 对象rdd2 ,包含的元素为 10, 20, 30, 40, 50。
【拓展】
链式调用:在编程中将多个方法或函数的调用串联在一起的方式。
在 PySpark 中,链式调用非常常见,通常用于对 RDD 进行一系列变换或操作。通过链式调用,开发者可以在一条语句中连续执行多个操作,不需要将每个操作的结果存储在一个中间变量中,从而提高代码的简洁性和可读性。
例如:
代码语言:python代码运行次数:0复制from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10,然后都加上5
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x 5)
print(rdd2.collect())
输出结果: 15, 25, 35, 45, 55
【分析】
第一个map
算子接收一个 lambda 函数,这个函数将传入的每个元素乘以 10;第二个map
算子在第一个map
的结果上再次调用新的 lambda 函数,每个元素再加上 5。处理后的结果为:10 5, 20 5, 30 5, 40 5, 50 5,即 15, 25, 35, 45, 55。
二、flatMap算子
定义:
flatMap
算子将输入RDD中的每个元素映射到一个序列,然后将所有序列扁平化为一个单独的RDD。简单来说,就是对rdd执行map操作,然后进行解除嵌套操作。
语法:
new_rdd = rdd.flatMap(func)
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hi python","Hello world","Happy day"])
# 需求将RDD数据里面的单词一个个提取出来
rdd2=rdd.map(lambda x:x.split(" "))
print(rdd2.collect())
sc.stop()
输出结果:
["hi python","Hello world","Happy day"]
【分析】
map
算子执行过程如下:
对于第一个元素 "hi python",通过 split(" ")得到的结果是 "hi", "python";
对于第二个元素 "Hello world",通过 split(" ")得到的结果是 "Hello", "world";
对于第三个元素 "Happy day",通过 split(" ")得到的结果是 "Happy", "day";
显而易见,输出的结果不满足我们的需求,我们运用flatMap
算子,将rdd2=rdd.map(lambda x:x.split(" "))
改为如下代码后
rdd2=rdd.flatmap(lambda x:x.split(" "))
输出结果:
'hi', 'python', 'Hello', 'world', 'Happy', 'day'
flatMap
算子会将结果扁平化为单一列表,适合于需要展开嵌套结构的场景。
三、reduceByKey算子
定义:
reduceByKey
算子用于将具有相同键的值进行合并,并通过指定的聚合函数生成一个新的键值对 RDD。
语法:
new_rdd = rdd.reduceByKey(func)
参数func
是一个用于合并两个相同键的值的函数,其接收两个相同类型的参数并返回一个相同类型的值,其函数表示法为f:(V,V)→>V
- f: 函数的名称或标识符
- (V, V):表示函数接收两个相同类型的参数
- → V:表示函数的返回值类型
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# reduceByKey算子
rdd=sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生两个组的成绩之和
rdd2=rdd.reduceByKey(lambda a,b:a b)
print(rdd2.collect())
sc.stop()
输出结果:
('男',187), ('女',165)
【分析】
reduceByKey
算子根据每个不同的键调用匿名函数 lambda a, b: a b,将其接受两个参数相加。
对于键 '男':
首先处理到的值是 99,然后是 88;
使用 lambda a, b: a b,即 99 88 = 187。
对于键 '女':
首先处理到的值是 99,然后是 66;
使用 lambda a, b: a b,即 99 66 = 165。
四、filter算子
定义:
filter
算子根据给定的布尔函数过滤RDD中的元素,返回一个只包含满足条件的元素的新RDD。
语法:
new_rdd = rdd.filter(func)
参数func
是一个函数,用于接收 RDD 中的每个元素,并返回一个布尔值(True 或 False)。
- 如果返回 True,则该元素会被保留在新 RDD 中
- 如果返回 False,则该元素会被过滤掉
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# filter算子
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 过滤RDD数据中的奇数,仅保留偶数
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
sc.stop()
输出结果:
2, 4
五、distinct算子
定义:
distinct
算子对RDD数据进行去重,返回一个新的RDD。
语法:
new_rdd = rdd.distinct()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# distinct算子
rdd = sc.parallelize([1, 2, 2, 5, 5, 6])
# 对RDD数据进行去重
rdd2=rdd.distinct()
print(rdd2.collect())
sc.stop()
输出结果:
1, 2, 5, 6
六、sortBy算子
定义:
sortBy
算子根据指定的键对元素进行排序。
语法:
new_rdd = rdd.sortBy(func, ascending=True, numPartitions=None)
- 参数:func:用于指定排序依据的函数
- 参数ascending:指定排序的顺序,True 表示升序排序(默认值);False 表示降序排序
- 参数numPartitions:可选参数,指定分区数
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 创建了一个包含四个元组的 RDD
rdd=sc.parallelize([('小明',99),('小红',88),('小城',99),('小李',66)])
# 使用 sortBy 方法将 RDD 按照分数(元组中的第二个元素)进行降序排序
rdd2=rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(rdd2.collect())
sc.stop()
输出结果:
('小明', 99), ('小城', 99), ('小红', 88), ('小李', 66)
【注意】
如果多个元素具有相同的键(如这里的 99),sortBy
算子会保持这些元素在原始 RDD 中的相对顺序(稳定排序)。