PySpark数据计算

2024-08-14 00:50:28 浏览数 (3)

前言

在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。

在 PySpark 中,所有的数据计算都是基于 RDD(弹性分布式数据集)对象进行的。RDD 提供了丰富的成员方法(算子)来执行各种数据处理操作。

一、map算子

定义:

map算子会对RDD中的每个元素应用一个用户定义的函数,并返回一个新的 RDD。

语法:

new_rdd = rdd.map(func)

参数func为一个函数,该函数接受单个输入参数,并返回一个输出值,其函数表示法为f:(T) → U

  • f:表示这是一个函数(方法)
  • T:表示传入参数的类型,可以是任意类型
  • U:表示返回值的类型,可以是任意类型
  • (T)-U:表示该方法接受一个参数(类型为 T),返回值的类型为 U
代码语言:python代码运行次数:0复制
import os
from pyspark import SparkConf, SparkContext
# os.environ['PYSPARK_PYTHON'] =“自己电脑Python.exe的安装路径”,用于指定Python解释器
os.environ['PYSPARK_PYTHON'] = "D:StudyPaythonpython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
    return data * 10
    
print(rdd2.collect())

输出结果: 10,20,30,40,50

【分析】

rdd.map(func) 创建一个新的RDD对象rdd2,其中每个元素都会通过map算子应用函数 func。因此,原始 RDD 中的每个元素(1, 2, 3, 4, 5)都会依次被传入 func 函数并处理:

func(1) 产生 10

func(2) 产生 20

func(3) 产生 30

func(4) 产生 40

func(5) 产生 50

结果是新的RD 对象rdd2 ,包含的元素为 10, 20, 30, 40, 50。

【拓展】

链式调用:在编程中将多个方法或函数的调用串联在一起的方式。

在 PySpark 中,链式调用非常常见,通常用于对 RDD 进行一系列变换或操作。通过链式调用,开发者可以在一条语句中连续执行多个操作,不需要将每个操作的结果存储在一个中间变量中,从而提高代码的简洁性和可读性。

例如:

代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10,然后都加上5
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x   5)
print(rdd2.collect())

输出结果: 15, 25, 35, 45, 55

【分析】

第一个map算子接收一个 lambda 函数,这个函数将传入的每个元素乘以 10;第二个map算子在第一个map的结果上再次调用新的 lambda 函数,每个元素再加上 5。处理后的结果为:10 5, 20 5, 30 5, 40 5, 50 5,即 15, 25, 35, 45, 55。

二、flatMap算子

定义:

flatMap算子将输入RDD中的每个元素映射到一个序列,然后将所有序列扁平化为一个单独的RDD。简单来说,就是对rdd执行map操作,然后进行解除嵌套操作。

语法:

new_rdd = rdd.flatMap(func)

代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hi python","Hello world","Happy day"])
# 需求将RDD数据里面的单词一个个提取出来
rdd2=rdd.map(lambda  x:x.split(" "))
print(rdd2.collect())
sc.stop()

输出结果:

["hi python","Hello world","Happy day"]

【分析】

map算子执行过程如下:

对于第一个元素 "hi python",通过 split(" ")得到的结果是 "hi", "python";

对于第二个元素 "Hello world",通过 split(" ")得到的结果是 "Hello", "world";

对于第三个元素 "Happy day",通过 split(" ")得到的结果是 "Happy", "day";

显而易见,输出的结果不满足我们的需求,我们运用flatMap算子,将rdd2=rdd.map(lambda x:x.split(" "))改为如下代码后

代码语言:python代码运行次数:0复制
rdd2=rdd.flatmap(lambda  x:x.split(" "))

输出结果:

'hi', 'python', 'Hello', 'world', 'Happy', 'day'

flatMap算子会将结果扁平化为单一列表,适合于需要展开嵌套结构的场景。

三、reduceByKey算子

定义:

reduceByKey算子用于将具有相同键的值进行合并,并通过指定的聚合函数生成一个新的键值对 RDD。

语法:

new_rdd = rdd.reduceByKey(func)

参数func是一个用于合并两个相同键的值的函数,其接收两个相同类型的参数并返回一个相同类型的值,其函数表示法为f:(V,V)→>V

  • f: 函数的名称或标识符
  • (V, V):表示函数接收两个相同类型的参数
  • → V:表示函数的返回值类型
代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# reduceByKey算子
rdd=sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生两个组的成绩之和
rdd2=rdd.reduceByKey(lambda a,b:a b)
print(rdd2.collect())
sc.stop()

输出结果:

('男',187), ('女',165)

【分析】

reduceByKey算子根据每个不同的键调用匿名函数 lambda a, b: a b,将其接受两个参数相加。

对于键 '男':

首先处理到的值是 99,然后是 88;

使用 lambda a, b: a b,即 99 88 = 187。

对于键 '女':

首先处理到的值是 99,然后是 66;

使用 lambda a, b: a b,即 99 66 = 165。

四、filter算子

定义:

filter算子根据给定的布尔函数过滤RDD中的元素,返回一个只包含满足条件的元素的新RDD。

语法:

new_rdd = rdd.filter(func)

参数func是一个函数,用于接收 RDD 中的每个元素,并返回一个布尔值(True 或 False)。

  • 如果返回 True,则该元素会被保留在新 RDD 中
  • 如果返回 False,则该元素会被过滤掉
代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# filter算子
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 过滤RDD数据中的奇数,仅保留偶数
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
sc.stop()

输出结果:

2, 4

五、distinct算子

定义:

distinct算子对RDD数据进行去重,返回一个新的RDD。

语法:

new_rdd = rdd.distinct()

代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# distinct算子
rdd = sc.parallelize([1, 2, 2, 5, 5, 6])
# 对RDD数据进行去重
rdd2=rdd.distinct()
print(rdd2.collect())
sc.stop()

输出结果:

1, 2, 5, 6

六、sortBy算子

定义:

sortBy算子根据指定的键对元素进行排序。

语法:

new_rdd = rdd.sortBy(func, ascending=True, numPartitions=None)

  • 参数:func:用于指定排序依据的函数
  • 参数ascending:指定排序的顺序,True 表示升序排序(默认值);False 表示降序排序
  • 参数numPartitions:可选参数,指定分区数
代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:桌面StudyPaythonlearningpythonProject.venvScriptspython.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 创建了一个包含四个元组的 RDD
rdd=sc.parallelize([('小明',99),('小红',88),('小城',99),('小李',66)])
# 使用 sortBy 方法将 RDD 按照分数(元组中的第二个元素)进行降序排序
rdd2=rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(rdd2.collect())
sc.stop()

输出结果:

('小明', 99), ('小城', 99), ('小红', 88), ('小李', 66)

【注意】

如果多个元素具有相同的键(如这里的 99),sortBy算子会保持这些元素在原始 RDD 中的相对顺序(稳定排序)。

0 人点赞