PySpark基础

2024-08-13 00:25:40 浏览数 (3)

前言

PySpark,作为 Apache Spark 的 Python API,使得处理和分析大数据变得更加高效且易于访问。本章详细讲解了PySpark 的基本概念和架构以及据的输入与输出操作。

一、PySpark入门

①定义

Apache Spark 是一个用于大规模数据处理的统一分析引擎。简单来说,Spark 是一款分布式计算框架,能够调度成百上千的服务器集群,以处理 TB、PB 乃至 EB 级别的海量数据。

作为全球顶级的分布式计算框架,Spark 支持多种编程语言进行开发,其中 Python 语言是 Spark 特别支持的重点方向。

Spark 对 Python 的支持主要体现在第三方库 PySpark 上。PySpark 是由Spark 官方开发的一款 Python 库,允许开发者使用 Python 代码完成 Spark 任务。

PySpark 不仅可以作为独立的 Python 库使用,还能将程序提交到 Spark 集群进行大规模的数据处理。

Python 的应用场景和就业方向相当广泛,其中大数据开发和人工智能是最为突出的方向。

②安装PySpark库

电脑输入Win R打开运行窗口→在运行窗口输入“cmd”→点击“确定”→输入pip install pyspark

③编程模型

PySpark 的编程流程主要分为以下三个步骤:

准备数据到RDD → RDD迭代计算 → RDD导出为列表、元组、字典、文本文件或数据库等。

  • 数据输入:通过 SparkContext 对象读取数据
  • 数据计算:将读取的数据转换为 RDD 对象,并调用 RDD 的成员方法进行迭代计算
  • 数据输出:通过 RDD 对象的相关方法将结果输出到列表、元组、字典、文本文件或数据库等

④构建PySpark执行环境入口对象

SparkContext是PySpark的入口点,负责与 Spark 集群的连接,并提供了创建 RDD(弹性分布式数据集)的接口。

要使用 PySpark 库完成数据处理,首先需要构建一个执行环境的入口对象,该对象是 SparkContext 类的实例。创建 SparkContext 对象后,便可开始进行数据处理和分析。

代码语言:python代码运行次数:0复制
# 导包
# SparkConf:用于配置Spark应用的参数
# SparkContext:用于连接到Spark集群的入口点,负责协调整个Spark应用的运行
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象,用于设置 Spark 程序的配置
# local[*]表示在本地运行Spark
# [*]表示使用系统中的所有可用核心。这适合于开发和测试。
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

SparkConf 类的常用方法:

方法

描述

setMaster(master)

设置 Spark 的运行模式

setAppName(name)

设置 Spark 应用程序的名称,在 Spark UI 中显示

set(key, value)

设置任意的配置参数,通过键-值对的方式设置配置项

setAll(pairs)

批量设置多个配置项,接收包含键-值对的列表或元组

setExecutorEnv(key, value)

设置 executor 的环境变量

get(key, defaultValue=None)

获取指定键的配置值,若不存在,则返回默认值

contains(key)

检查配置中是否包含某个键

clear()

清空所有设置的配置项

getAll()

获取所有的配置项,以键-值对的形式返回

set("spark.some.config.option", "value")

可设置任何有效的 Spark 配置选项

二、数据输入

①RDD对象

如下图所示,PySpark 支持多种格式的数据输入,并在输入完成后生成一个 RDD 对象。

RDD 的全称是弹性分布式数据集(Resilient Distributed Datasets),它是 PySpark 中数据计算的载体,具备以下功能:

  • 提供数据存储
  • 提供数据计算的各类方法

RDD 具有迭代计算特性,RDD的数据计算方法,返回值依旧是RDD对象。

②Python数据容器转RDD对象

在 PySpark 中,可以通过 SparkContext 对象的 parallelize 方法将 list、tuple、set、dict 和 str 转换为 RDD 对象。

parallelize() :用于将本地集合(即 Python 的原生数据结构)转换为 RDD 对象。

方法签名:

SparkContext.parallelize(collection, numSlices=None)

  • 参数collection: 可以是任何可迭代的数据结构(例如list、tuple、set、dict 或 str 的列表)
  • 参数numSlices: 可选参数,用于指定将数据划分为多少个分片
代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize("abcdefg")
rdd4=sc.parallelize({1,2,3,4,5})
rdd5=sc.parallelize({"key1":"value1","key2":"value=2"})

# 使用collect()方法查看RDD里面有什么内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 1, 2, 3, 4, 5 1, 2, 3, 4, 5 'a', 'b', 'c', 'd', 'e', 'f', 'g' 1, 2, 3, 4, 5 'key1', 'key2'

【注意】

  • 对于字符串,parallelize 方法会将其拆分为单个字符并存入 RDD。
  • 对于字典,只有键会被存入 RDD 对象,值会被忽略。

③读取文件转RDD对象

在 PySpark 中,可通过 SparkContext 的 textFile 成员方法读取文本文件并生成RDD对象。

textFile():用于读取文本文件并将其内容作为 RDD(弹性分布式数据集)加载。

方法签名:textFile(path, minPartitions=None)

  • 参数path:要读取的文件的路径
  • 参数minPartitions:可选参数,用于指定数据划分的最小分片数

例如:电脑D盘中有一个test.txt文本文件,内容如下:

代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 使用textFile方法,读取文件数据加载到Spark内,使其成为RDD对象
rdd=sc.textFile("D:/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 'Hello python!', '你好 Python!!!', '123456'

三、数据输出

①collect算子

功能:

将分布在集群上的所有 RDD 元素收集到驱动程序(Driver)节点,从而形成一个普通的 Python 列表

用法:

rdd.collect()

代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,6])
# collect 算子,输出RDD为List对象
# print(rdd)  输出的是类名,输出结果:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
rdd_list=rdd.collect()
print(rdd_list)
print(type(rdd_list))
sc.stop()

输出结果:

1, 2, 3, 4, 5, 6

<class 'list'>

②reduce算子

功能:

将 RDD 中的元素两两应用指定的聚合函数,最终合并为一个值,适用于需要归约操作的场景。

用法:

rdd.reduce(lambda a, b: a b)

代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])

# reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda a,b:a b)
print(num)
sc.stop()

输出结果:

15

【分析】

③take算子

功能:

从 RDD 中获取指定数量的元素,以列表形式返回,同时不会将所有数据传回驱动。如果指定的元素数量超出 RDD 元素数量,则返回所有元素。

用法:

rdd.take(n)

代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# take算子,取出RDD前N个元素并组成list返回
take_list=rdd.take(3)
print(take_list)
sc.stop()

输出结果:

1, 2, 3

④count算子

功能:

返回 RDD 中元素的总个数。

用法:

rdd.count()

代码语言:python代码运行次数:0复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# count算子,统计rdd内有多少条数据,返回值为数字
num_count=rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出结果:

rdd内有5个元素

⑤saveAsTextFile算子

功能:

将 RDD 中的数据写入文本文件中。

用法:

rdd.saveAsTextFile(path)

调用保存文件的算子,需配置Hadoop依赖,配置方法如下:

  1. 下载Hadoop安装包: 下载网址:http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
  2. 将Hadoop安装包解压到电脑任意位置
  3. 在Python代码中配置os模块: os.environ‘HADOOP_HOME’ = ‘HADOOP解压文件夹路径’
  4. 下载winutils.exe: 下载网址:https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
  5. 将winutils.exe放入Hadoop解压文件夹的bin目录内
  6. 下载hadoop.dll: 下载网址:https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
  7. 将hadoop.dll放入:C:/Windows/System32 文件夹内
代码语言:python代码运行次数:0复制
from pyspark import SparkConf, SparkContext
# os用于操作系统级功能,这里用来设置环境变量
import os
# 指定 PySpark 使用的 Python 解释器路径
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
# 指定 Hadoop 的安装目录
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备RDD1,传入numSlices参数为1,数据集划分为一个切片
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)

# 准备RDD2,传入numSlices参数为1,数据集划分为一个切片
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)

# 准备RDD3,传入numSlices参数为1,数据集划分为一个切片
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)

# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

打开output2文本文件,输出结果如下:

0 人点赞