生态
Spark:通用大数据快速处理引擎。可以基于Hadoop上存储的大数据(HDFS、Hive、HBase等任何实现了Hadoop接口的存储系统)进行计算。
Spark按照功能侧重点划分为几个模块:Spark Core、Spark SQL、Spark Streaming、Spark MLlib、GraphX。这几个模块都是基于Spark Core的关键抽象RDD做了扩展。
Spark官方生态图:
对早前生态组件的翻译版:
个人总结的版本:
简单总结:
1. Spark Core:Spark的核心模块,主要就是对计算引擎本身的抽象和实现
2. Spark Streaming:以流就是无限个小批次,实现这样来定义的流式计算。(Flink正好相反,本质上就是对一个事物从两个对立面的不同解释,并按照自己的解释实现了各自的流批统一的计算引擎)
3. Spark MLlib:利用Spark自身作为计算引擎,提供的机器学习库
4. Spark SQL:提供SQL调用来简化Spark计算引擎的学习成本,方面做数据不同维度的分析和挖掘等
核心抽象
1. Application:Spark应用程序
指的是用户编写的Spark应用程序。包含,
(1) Driver功能代码
(2) 分布在集群中多个节点上运行的Executor代码。
由一个或多个作业Job组成。
Application由Cluster Manager(例如Hadoop Yarn的Resource Manager)进行调度。
2. Driver:驱动程序
Driver,即运行上述Application的Main()函数,并且创建了SparkContext(SparkSession)。
SparkContext负责准备Spark运行环境,和ClusterManager通信,申请资源、分配任务和监控任务等;
Main()负责说明用户定义的有向无环图的逻辑;并且在Executor运行完毕后,将SparkContext关闭。
3. Cluster Manager:资源管理器
指在集群上获取资源的外部服务,常用的有:
(1) Standalone:Spark原生的资源管理器,由Master负责资源管理;
(2) Mesos:由Mesos Master负责资源管理;
(3) HaddopYarn:由ResourceManager负责资源管理。
4. Worker:计算节点
集群中任何可以运行Application代码的节点,类似于Yarn中的NodeManager节点。
(1) 在Standalone模式中指的就是通过Slave文件配置的Worker节点
(2) 在Spark on Yarn模式中指的就是NodeManager节点
(3) 在Spark on Mesos模式中指的就是Mesos Slave节点
5. Executor:执行器
是Application运行在Worker节点上的一个进程,负责运行Task,负责将数据存在内存或者磁盘上。每个Application都有各自独立的一批Executor,如下图所示:
至此我们分析了计算引擎实现方面的5个核心抽象,分别是:
(1) Application:Spark应用程序
(2) Driver:驱动程序
(3) Cluster Manager:资源管理器
(4) Worker:计算节点
(5) Executor:执行器
接下来分析在计算逻辑方面的核心抽象:
1. RDD:弹性分布式数据集
Resillient Distributed Dataset,Spark的基本计算单元,可以通过一系列算子进行操作(主要有Transformation和Action操作),如下图所示:
2. Narrow Dependency:窄依赖
父RDD每一个分区最多被一个子RDD的分区所用:表现为一个父RDD的分区对应于一个子RDD的分区,或两个父RDD的分区对应于一个子RDD的分区。
即一对一或多对一。如图所示:
3. Shuffle Dependency:宽依赖
父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区。
即多对多。如图所示:
(1) 常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned:如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle),而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖)。
(2) 常见的宽依赖有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned:除此之外的,rdd 的join api是宽依赖)。
窄依赖会发生一种现象:Shuffle,所以就叫做Shuffle Dependency,由此我们可以得出Shuffle概念的内涵:不同分区中的数据发生混洗,一些不同分区中的数据互相会见面。
4. DAG:有向无环图
Directed Acycle Graph,反应RDD之间的依赖关系,如图所示:
至此我们分析了计算逻辑实现方面的5个核心抽象,分别是:
(1) RDD:弹性分布式数据集
(2) Narrow Dependency:窄依赖
(3) Shuffle Dependency:宽依赖
(4) DAG:有向无环图
接下来分析在计算调度方面的核心抽象:
1. DAGScheduler:有向无环图调度器
基于DAG划分Stage并以TaskSet的形势提交Stage给TaskScheduler;负责将作业拆分成不同阶段的具有依赖关系的多批任务;最重要的任务之一就是:计算作业和任务的依赖关系,制定调度逻辑。在SparkContext初始化的过程中被实例化,一个SparkContext对应创建一个DAGScheduler。
DAGScheduler调度Stage。
2. TaskScheduler:任务调度器
将TaskSet提交给worker运行并回报结果;负责每个具体任务的实际物理调度。如图所示:
(1) TaskScheduler调度TaskSet。
(2) TaskSetManager调度Task。
3. Job:作业
触发一个finalRDD的实际计算为一个Job。由一个或多个调度阶段所组成的一次计算作业;包含多个Task组成的并行计算,往往由Action Operation催生,一个Job包含多个RDD及作用于相应RDD上的各种Operation(算子/操作)。
4. Stage:调度阶段
Stage分成两种类型ShuffleMapStage、ResultStage。如图所示:
5. TaskSet:任务集
由一组关联的,但相互之间没有Shuffle依赖关系的任务所组成的任务集。如图所示:
提示:
(1) 一个Stage创建一个TaskSet;
(2) 为Stage的每个RDD分区创建一个Task,多个Task封装成TaskSet。
6. Task:任务
被送到某个Executor上的工作任务;单个分区数据集上的最小处理流程单元。
至此我们分析了计算调度实现方面的5个核心抽象,分别是:
(1) DAGScheduler:有向无环图调度器
(2) TaskScheduler:任务调度器
(3) Job:作业
(4) Stage:调度阶段
(5) TaskSet:任务集
(6) Task:任务
总体:
抽象关系
集群节点、RDD分区、CPU核、并行度之间数量关系?
例如,输入是HDFS上的若干File,每个File都包含了很多Block(块)。
当Spark读取输入File时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个InputSplit(输入分片),注意InputSplit不能跨越File。
随后,将为这些输入分片生成具体的Task。InputSplit与Task是一对一的关系。
然后,这些Task会被分别分配到集群中一些节点的某些Executor中去执行。
(1) 每个节点可以起若干Executor
(2) 每个Executor由若干Core组成,每个Core只能同时执行一个Task。(注意:这里的Core是Spark的逻辑概念,不是物理CPU,可理解为Executor的一个工作线程),即InputSplit(存储角度,还有Block、File):Task(任务角度,还有TaskSet、Satge、Job、DAG):Partition(RDD抽象角度):Core(执行角度,还有Executor、Worker)=1:1:1:(n/m)。
(3) 每个Task执行的结果对应目标RDD的一个Partiton。
Worker节点的并行度 = Executor数目 * 每个Executor的Core数
至于Partition的数目:
(1) 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
(2) 在Map阶段Partition数目保持不变。
(3) 在Reduce阶段,RDD的聚合会触发shuffle,聚合后的RDD的partition数目跟具体操作有关,例如,一些算子是可配置的,repartition操作会聚合成指定分区数。
抽象细节
RDD
1. 定义
官方定义:弹性分布式数据集(ResillientDistributed Dataset,RDD),不可变分布式对象集合。在任何时候都能重算,是描述为“弹性”的原因。
对RDD的操作不外乎:创建RDD;转换RDD;对RDD进行求值。
在Spark中,我们通过对RDD的操作表达我们的计算意图,这些计算会自动地在集群上并行执行。Spark最神奇的地方就在于自动将函数分发到各个执行器节点上。这样只需在单一驱动程序中编程,Spark让代码自动在多个节点上并发执行,即简化并行、移动计算。
(1) RDD是分布式的不可变的(只读的)、已分区的、对象(可序列化的)集合
(2) 通过并行的方式进行一系列:创建、转换、计算
(3) 可以控制存储级别(内存、磁盘等)来进行重用
(4) 失败自动重建
2. 分区
每个RDD都被分为多个分区。
3. 创建、转换、行动操作
注意:操作也被称为算子(operator)
(1) 创建:读取外部数据集(textFile);从驱动程序中对一个集合进行并行化(parallelize)。
(2) 转化(transformation):参数和返回都是RDD
(3) 行动(action):参数是RDD,将结果返回驱动程序或写入外部系统。
DAG
每个RDD维护了其指向一个或多个父节点的引用,以及表示其与父节点之间关系的信息。比如,当你在RDD上调用var b = a.map( )时,b这个RDD就存下了对其父节点a的一个引用。这些引用使得RDD可以追踪到其所有的祖先节点。
Spark调度器从最终被调用行动操作的RDD出发,向上回溯所有的必须计算的RDD。调度器会访问RDD的父节点、父节点的父节点、以此类推,递归向上生成计算所有必要的祖先RDD的物理计划。
RDD与Stage并不是一一对应的关系(Job 内部的I/O优化):
(1) 当RDD不需要混洗数据就可以从父节点计算出来时,调度器就会自动进行流水线执行。当调度器进行流水线执行(pipelining),或把多个RDD合并到一个步骤中时。
(2) 当一个RDD已经缓存在集群内存或磁盘上,Spark的内部调度器也会自动截断RDD谱系图。这种情况下,Spark会“短路”求值,直接基于缓存下来的RDD进行计算。
(3) 还有一种截断RDD谱系图的情况发生在当RDD已经在之前的混洗中作为副产品物化出来时,哪怕该RDD并没有被显示调用persist()方法。这种内部优化是基于Spark数据混洗操作的输出均被写入磁盘的特性。
架构
Spark三种提交模式:
(1) Spark Core架构其实就是standalone模式。它基于Spark自己的Master-Worker集群。
(2) 第二种是基于YARN的yarn-cluster模式。用于生产环境,本地看不到log,只能通过yarn logs -applicationId application_id这种命令来查看(先去yarn的仪表盘找到application_id)。
(3) 第三种是基于YARN的yarn-client模式。用于测试,Driver运行在本地客户端,负责调度Application,本地可以看到所有的log。
注意:切换到第二种和第三种模式,加上--master参数,设置为yarn-cluster或者yarn-client,再在spark-env.sh中添加export HADOOP_HOME={hadoop安装目录}。
Spark Cluster
Spark on Yarn
Spark on Yarn-Client mode
Spark on Yarn-Cluster mode
流程
基本流程
Spark一般流程:
(1) 从外部数据或驱动程序创建出输入RDD
(2) 使用转化操作对RDD进行转化,以定义新的RDD
(3) 对需要被重用的中间结果RDD执行内存或磁盘缓存操作
(4) 使用行动操作来触发一次并行计算,Spark会对计算进行优化后再执行(这也是懒加载的原因)
Spark on Yarn-Client mode
(1) Spark Yarn Client向Yarn的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TaskScheduler;
(2) ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的Application Master;
(3) Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,ApplicationMaster向ResourceManager注册,根据任务信息向ResourceManager申请资源;
(4) 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它在获得的Container中启动Executor,启动后会向Client中的SparkContext注册并申请Task;
(5) Client中的SparkContext分配Task给Executor执行,Executor运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6) 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
Spark on Yarn-Cluster mode
(1) Spark Yarn Client向Yarn中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
(2) ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
(3) ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
(4) 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它在获得的Container中启动启动Executor,启动后会向ApplicationMaster中的SparkContext注册并申请Task;
(5) ApplicationMaster中的SparkContext分配Task给Executor执行,Executor运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6) 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。如下图所示。
DAG划分Stages
Application:Job = 1:N;Job:Stage = 1:N。Application中可以因为不同的Action触发众多的Job,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
划分依据:Stage划分的依据就是宽依赖,何时产生宽依赖。
核心算法:从后往前回溯,遇到窄依赖加入本Stage,遇见宽依赖进行Stage切分。Spark内核会从触发Action操作的那个RDD开始从后往前推,首先会为最后一个RDD创建一个Stage,然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的Stage,那个RDD就是新的Stage的最后一个RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行Stage的划分,直到所有的RDD全部遍历完成为止。(本质类似于递归遍历树)
提交Stages
调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler通过TaskScheduler接口提交任务集,这个任务集最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个任务集的生命周期,对于DAGScheduler来说,提交调度阶段的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的任务到对应的Executor节点上进行运算。TaskSetManager负责管理TaskSchedulerImpl中一个单独TaskSet,跟踪每一个Task,如果Task失败,负责重试Task直到达到Task重试次数的最多次数。
监控Job、Task、Executor
DAGScheduler监控Job与Task:要保证相互依赖的作业调度阶段能够得到顺利的调度执行,DAGScheduler需要监控当前作业调度阶段乃至任务的完成情况。这通过对外暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler根据这些任务的生命周期信息进一步维护作业和调度阶段的状态信息。
DAGScheduler监控Executor的生命状态:TaskScheduler通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了,则对应的调度阶段任务集的ShuffleMapTask的输出结果也将标志为不可用,这将导致对应任务集状态的变更,进而重新执行相关计算任务,以获取丢失的相关数据。
获取任务执行结果
DAGScheduler:一个具体的任务在Executor中执行完毕后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务结果的返回方式也不同。
两种结果,中间结果与最终结果:对于FinalStage所对应的任务,返回给DAGScheduler的是运算结果本身,而对于中间调度阶段对应的任务ShuffleMapTask,返回给DAGScheduler的是一个MapStatus里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个调度阶段的任务获取输入数据的依据。
两种类型,DirectTaskResult与IndirectTaskResult:根据任务结果大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内中,如果超过特定尺寸则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个数据块存放在BlockManager中,然后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。
(1) DAGScheduler将Job分解成具有前后依赖关系的多个stage
(2) DAGScheduler是根据ShuffleDependency划分stage的
(3) stage分为ShuffleMapStage和ResultStage;一个Job中包含一个ResultStage及多个ShuffleMapStage
(4) 一个stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask
finalStage是Spark源码中的一个引用名称,类型为ResultStage
任务调度总体诠释
例1:
从HDFS中读入数据生成3个不同的RDD,通过一系列transformation操作后再将计算结果保存回HDFS。可以看到这个DAG中只有join操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage。同时我们可以注意到,在图中Stage2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,通过map操作生成的partition可以不用等待整个RDD计算结束,而是继续进行union操作,这样大大提高了计算的效率。
例2:
Spark的计算发生在RDD的Action操作,而对Action之前的所有Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。
Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。
特点
三个优点
作为MapReduce的继承者,Spark主要有三个优点:
(1) Spark非常好用。由于高级API剥离了对集群本身的关注,你可以专注于你所要做的计算本身,只需要在自己的笔记本电脑上就可以开发Spark应用。支持交互式使用和复杂算法。
(2) Spark非常快。相对于MapReduce而言,基于内存的计算更多,网络磁盘I/O更少。
(3) Spark是一个通用的计算框架。
- 可以用他来完成更丰富的计算,包括SQL查询、文本处理、机器学习等,而不用学习各种各样的引擎来分别处理这些需求。
- 软件栈中所有程序和高级组件都可以从下层的改进中获益,
- 运行整个软件栈的代价变小。部署多个独立软件变成了只需部署一个软件。
运行架构特点
惰性求值
当我们创建RDD、对RDD调用转化操作时,程序操作不会立即执行,直到RDD在被调用行动操作时Spark才会开始计算。也就是执行了action之后,才会触发job,提交task到spark集群上,进行实际的执行。
一旦Spark了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。
Spark使用惰性求值,这样就可以对逻辑执行计划作一些优化,比如将连续的映射转为流水线执行,将多个操作合并到一个步骤中。例如,某RDD先执行map转换算子,再执行filter过滤算子,那么就可以在map的同时执行了filter算子,这样就使得更少的数据需要存储了。
我们不应该把RDD看作存放着特定数据的数据集,而最好把每个RDD当作我们通过转化操作构建出来的、记录着如何计算数据的指令列表。把数据读取到RDD的操作也同样是惰性的。
移动程序(分区)
移动程序而非移动数据。
Task采用了数据本地性和推测执行的优化机制。关键方法:TaskIdToLocations、getPreferedLocations。如图所示:
在分布式程序中通行的代价很大,因此控制数据分布以获得最少的网络传输可以极大的提升整体性能。
为分布式数据集选择正确的分区方式和为本地数据集选择合适的数据结构很相似——在这两种情况下,数据的分布都会极其明显地影响程序的性能表现。
分区并不是对所有应用都有好处,比如,如果给定RDD只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
Spark会根据一个针对键的函数对元素进行分组。尽管Spark没有给出显示控制每个键具体落在哪一个工作节点上的方法,但是Spark可以确保同一组的键出现在同一个节点上。
支持多种资源管理器
Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了
Executor进程专属
每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行Tasks。Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示:
Job提交就近原则
提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack(机架)里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换;如果想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。如图所示:
对比Mapreduce
1. 优势
(1) 依靠Scala强有力的函数式编程。
(2) Actor通信模式,akka做底层架构。
(3) Spark是基于内存的计算模型,很多计算可以直接在内存中进行,例如map、filter等转换算子,使得其灵活简单;而MapReduce模型死板,很小的一个操作都要经过完整的shuffle,shuffle包含了大量磁盘、网络等I/O。
(4) 数据共享快,省去了mapreduce的shuffle过程中至少三次存入磁盘所带来的额外开销。
(5) Spark的DAG做的好,越靠近编译器,就性能越好,优化也更好。
(6) 任务使用线程启动并执行,比mapreduce使用进程执行任务要有很大优势。
(7) delay scheduling -- 延迟执行。
2. 缺点
真正面对大数据时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出。而MapReduce虽然慢,但不至于OOM。
对比Storm
Spark | Storm | |
---|---|---|
节点 | worker | supervisor、nimbus |
进程 | executor | worker |
线程 | core | executor |
任务 | task(对应spout/bolt实例) | task(对应spout/bolt实例) |