pyspark(一)--核心概念和工作原理

2023-05-02 22:20:35 浏览数 (2)

在之前文章中我们介绍了大数据的基础概念,和pyspark的安装。

本文我们主要介绍pyspark的核心概念和原理,后续有时间会持续介绍pyspark的使用。

mapreduce和rdd

先回顾之前介绍的mapreduce和RDD的区别。

MapReduce的思想就是“分而治之”。Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理;Reducer负责对map阶段的结果进行汇总。但是mapreduce有个缺点就是每次计算都要从硬盘读写数据。

spark就是为了解决MapReduce计算框架慢而产生的大数据计算引擎。它使用的RDD设计就尽可能去避免硬盘读写,而是将数据优先存储在内存,为了优化RDD尽量在内存中的计算流程,还引入了lazy特性。

RDD(Resilient Distributed Dataset),弹性分布数据集。它提供了丰富的操作算子,不是只有map和reduce两个操作;支持懒操作,在RDDs之间构建一个DAG,中间结果不用执行,而且支持缓存,可以在内存中快速完成计算。

RDD有以下属性:

(1)partition分片:数据集的基本组成单位。计算的时候会通过compute函数得到每个分片的数据,每个分片被一个计算任务处理,分片决定了计算任务的粒度

(2)只读:RDD是只读的,想要改变RDD的数据,只能基于现有的RDD通过操作算子转换到一个新的RDD。

RDD有两类算子:transformation和action。transformation只建立逻辑转换流程,spark内部调用RDD的计算流程,构建一个有向无环图(DAG);action才真正的落地执行。

(3)依赖:上面提到RDD通过操作算字进行转换,所以RDDs之间是有依赖关系的

窄依赖:子RDD和父RDD中的各个partition是一一对应的关系,只单个依赖,不需要等待其他partition。比如:map,filter,union等操作产生窄依赖。

宽依赖:子RDD和父RDD中的partition存在一对多的关系,子RDD中的某个partition还要等待其他或者父RDD的partition。比如groupby,sortby产生宽依赖。

(4)缓存:如果一个RDD被多次使用,不需要每次都去转换,我们可以将RDD缓存,这样在计算时我们只需要计算一次,下次使用从缓存取就好。

再顺便说几个概念,Job,shuffle,stage和task。

Job:一个action触发一个Job

Shuffle:如果transformation或者action让RDD产生了宽依赖,也就是partition不能并行了,所有分片要打散重组(比如groupby,join操作),这就是产生了shuffle。

Stage:一个任务是RDD构成的DAG,如果有shuffle过程,那这个shuffle就将任务流分成不同阶段,也就是Stage。由于shuffle操作,让不同的Stage不能并行,后面的stage必须等前面的stage完成才能开始。

Task:具体任务,一个Job根据RDD的partition数量,创建多个task并发执行,每个task的逻辑是完全相同的,只是分片内数据不同。

总的来说,任务根据action分为多个Job,一个Job 根据宽依赖(Shuffle)分为多个stage;一个stage根据分片数分多个task。

hadoop和spark

Hadoop是对NSDF和MapReduce进行升级改造出的大数据框架系统 。

Hadoop架构中最重要的几个模块:HBase(实时分布式数据库),MapReduce(分布式计算框架),HDFS(分布式文件系统)。

spark 是对hadoop计算慢的改进,spark架构中最重要的几个模块:Spark SQL、Spark Streaming、GraphX、MLlib,这些模块都是建立在RDD上的。

Hadoop和mapreduce的关系,就类似spark和rdd的关系。

spark工作原理

Spark主要是用Scala语言开发,部分使用Java语言开发,运行在JVM中。同时在外层封装,实现对python,R等语言的开发接口。

Spark 集群由集群管理器 Cluster Manager、工作节点 Worker、执行器 Executor、驱动器 Driver、应用程序 Application 等部分组成。具体关系如下图:

Cluter Manager

spark 的集群管理器,主要负责整个集群资源的分配和管理。根据部署模式不同分为YARN,Mesos,Standalone。spark部署在yarn上有两种运行模式,client和cluster模式,区别就是Driver运行在client端还是ApplicationMaster端。

Worker

spark的工作节点,用于执行提交的作业。woker主要负责,向Cluter Manager

汇报自身的cpu和memory资源,创建分配资源给Executor,以及同步资源和Executor状态给Cluter Manager。

Executor

Application运行在Worker节点上的一个进程,负责计算task并将数据存储在内存或磁盘。

Driver

Application的驱动程序,程序运行中的main函数,创建SparkContext,划分RDD以及形成任务的DAG。Application通过Driver与Cluter Manager和Executor通信。

Application

用户使用spark实现的程序,包括driver的代码和分布在集群中运行在多节点的Executer代码。

整体流程大致:client端向yarn请求Application,yarn看资源够就会启动ApplicationMaster,然后ApplicationMater就去启动Driver,创建上下文,形成任务流,启动和分发任务给Executor并监控任务运行,Executor就去启动task执行具体的任务。

pyspark工作原理

上面也提到了spark在外层封装了python接口,主要是借助py4j实现python和java的交互。这样python使用者就不用多学一门java,轻松使用python进行大数据开发。

py4j

py4j是用python和java实现的库。通过PY4J,python可以动态访问Java虚拟机中的Java对象,Java程序也可以回调Python对象。

pyspark实现机制如下图:

在driver端,spark执行在JVM,python通过py4j调用Java的方法,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext,将pyspark程序映射到JVM中;

在Executor端,spark也执行在JVA,task任务已经是序列后的字节码,不需要用py4j了,但是如果里面包含一些python库函数,JVM无法处理这些python函数,所以会需要为每个task启动一个python进程,通过socket通信将python函数在python进程中执行后返回结果。

以上就是pyspark的工作原理。pyspark对于python使用者比较好上手,但是它也有个致命缺点就是慢,毕竟他是做过一层包装的,对于离线任务可以选择pyspark,但是对于实时任务还是最好使用scala。

参考:

https://www.jianshu.com/p/bd53509dc237

https://blog.csdn.net/oTengYue/article/details/88417186

0 人点赞