在之前文章中我们介绍了大数据的基础概念,和pyspark的安装。
本文我们主要介绍pyspark的核心概念和原理,后续有时间会持续介绍pyspark的使用。
mapreduce和rdd
先回顾之前介绍的mapreduce和RDD的区别。
MapReduce的思想就是“分而治之”。Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理;Reducer负责对map阶段的结果进行汇总。但是mapreduce有个缺点就是每次计算都要从硬盘读写数据。
spark就是为了解决MapReduce计算框架慢而产生的大数据计算引擎。它使用的RDD设计就尽可能去避免硬盘读写,而是将数据优先存储在内存,为了优化RDD尽量在内存中的计算流程,还引入了lazy特性。
RDD(Resilient Distributed Dataset),弹性分布数据集。它提供了丰富的操作算子,不是只有map和reduce两个操作;支持懒操作,在RDDs之间构建一个DAG,中间结果不用执行,而且支持缓存,可以在内存中快速完成计算。
RDD有以下属性:
(1)partition分片:数据集的基本组成单位。计算的时候会通过compute函数得到每个分片的数据,每个分片被一个计算任务处理,分片决定了计算任务的粒度
(2)只读:RDD是只读的,想要改变RDD的数据,只能基于现有的RDD通过操作算子转换到一个新的RDD。
RDD有两类算子:transformation和action。transformation只建立逻辑转换流程,spark内部调用RDD的计算流程,构建一个有向无环图(DAG);action才真正的落地执行。
(3)依赖:上面提到RDD通过操作算字进行转换,所以RDDs之间是有依赖关系的
窄依赖:子RDD和父RDD中的各个partition是一一对应的关系,只单个依赖,不需要等待其他partition。比如:map,filter,union等操作产生窄依赖。
宽依赖:子RDD和父RDD中的partition存在一对多的关系,子RDD中的某个partition还要等待其他或者父RDD的partition。比如groupby,sortby产生宽依赖。
(4)缓存:如果一个RDD被多次使用,不需要每次都去转换,我们可以将RDD缓存,这样在计算时我们只需要计算一次,下次使用从缓存取就好。
再顺便说几个概念,Job,shuffle,stage和task。
Job:一个action触发一个Job
Shuffle:如果transformation或者action让RDD产生了宽依赖,也就是partition不能并行了,所有分片要打散重组(比如groupby,join操作),这就是产生了shuffle。
Stage:一个任务是RDD构成的DAG,如果有shuffle过程,那这个shuffle就将任务流分成不同阶段,也就是Stage。由于shuffle操作,让不同的Stage不能并行,后面的stage必须等前面的stage完成才能开始。
Task:具体任务,一个Job根据RDD的partition数量,创建多个task并发执行,每个task的逻辑是完全相同的,只是分片内数据不同。
总的来说,任务根据action分为多个Job,一个Job 根据宽依赖(Shuffle)分为多个stage;一个stage根据分片数分多个task。
hadoop和spark
Hadoop是对NSDF和MapReduce进行升级改造出的大数据框架系统 。
Hadoop架构中最重要的几个模块:HBase(实时分布式数据库),MapReduce(分布式计算框架),HDFS(分布式文件系统)。
spark 是对hadoop计算慢的改进,spark架构中最重要的几个模块:Spark SQL、Spark Streaming、GraphX、MLlib,这些模块都是建立在RDD上的。
Hadoop和mapreduce的关系,就类似spark和rdd的关系。
spark工作原理
Spark主要是用Scala语言开发,部分使用Java语言开发,运行在JVM中。同时在外层封装,实现对python,R等语言的开发接口。
Spark 集群由集群管理器 Cluster Manager、工作节点 Worker、执行器 Executor、驱动器 Driver、应用程序 Application 等部分组成。具体关系如下图:
Cluter Manager
spark 的集群管理器,主要负责整个集群资源的分配和管理。根据部署模式不同分为YARN,Mesos,Standalone。spark部署在yarn上有两种运行模式,client和cluster模式,区别就是Driver运行在client端还是ApplicationMaster端。
Worker
spark的工作节点,用于执行提交的作业。woker主要负责,向Cluter Manager
汇报自身的cpu和memory资源,创建分配资源给Executor,以及同步资源和Executor状态给Cluter Manager。
Executor
Application运行在Worker节点上的一个进程,负责计算task并将数据存储在内存或磁盘。
Driver
Application的驱动程序,程序运行中的main函数,创建SparkContext,划分RDD以及形成任务的DAG。Application通过Driver与Cluter Manager和Executor通信。
Application
用户使用spark实现的程序,包括driver的代码和分布在集群中运行在多节点的Executer代码。
整体流程大致:client端向yarn请求Application,yarn看资源够就会启动ApplicationMaster,然后ApplicationMater就去启动Driver,创建上下文,形成任务流,启动和分发任务给Executor并监控任务运行,Executor就去启动task执行具体的任务。
pyspark工作原理
上面也提到了spark在外层封装了python接口,主要是借助py4j实现python和java的交互。这样python使用者就不用多学一门java,轻松使用python进行大数据开发。
py4j
py4j是用python和java实现的库。通过PY4J,python可以动态访问Java虚拟机中的Java对象,Java程序也可以回调Python对象。
pyspark实现机制如下图:
在driver端,spark执行在JVM,python通过py4j调用Java的方法,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext,将pyspark程序映射到JVM中;
在Executor端,spark也执行在JVA,task任务已经是序列后的字节码,不需要用py4j了,但是如果里面包含一些python库函数,JVM无法处理这些python函数,所以会需要为每个task启动一个python进程,通过socket通信将python函数在python进程中执行后返回结果。
以上就是pyspark的工作原理。pyspark对于python使用者比较好上手,但是它也有个致命缺点就是慢,毕竟他是做过一层包装的,对于离线任务可以选择pyspark,但是对于实时任务还是最好使用scala。
参考:
https://www.jianshu.com/p/bd53509dc237
https://blog.csdn.net/oTengYue/article/details/88417186