Spark 与 Hadoop 学习笔记 介绍及对比

2019-09-23 11:22:02 浏览数 (1)

这篇博客将会简单记录Hadoop与Spark对比,HDFS,MapReduce的基本概念,及Spark架构设计,RDD,运行模式。整理起来一起阅读方便我们理解整个大数据处理框架全局和发展。

1. Hadoop

1.1 背景

Hadoop就是解决了大数据(大到一台计算机无法进行存储,一台计算机无法在要求的时间内进行处理)的可靠存储和处理。

HDFS: 分布式文件存储

YARN: 分布式资源管理

MapReduce: 分布式计算

Others: 利用YARN的资源管理功能实现其他的数据处理方式

内部各个节点基本都是采用Master-Woker架构

适合

  • 大规模数据
  • 流式数据(写一次,读多次)
  • 商用硬件(一般硬件)

不适合

  • 低延时的数据访问
  • 大量的小文件
  • 频繁修改文件(基本就是写1次)1.2 HDFSHadoop Distributed File System,分布式文件系统 Block数据
  1. 基本存储单位,一般大小为64M(配置大的块主要是因为:1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;3)对数据块进行读写,减少建立网络的连接成本)
  2. 一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小
  3. 基本的读写单位,类似于磁盘的页,每次都是读写一个块
  4. 每个块都会被复制到多台机器,默认复制3份

NameNode

  1. 存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小
  2. 一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件
  3. 数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)
  4. NameNode失效则整个HDFS都失效了,所以要保证NameNode的可用性

Secondary NameNode

定时与NameNode进行同步(定期合并文件系统镜像和编辑日期,然后把合并后的传给NameNode,替换其镜像,并清空编辑日志,类似于CheckPoint机制),但NameNode失效后仍需要手工将其设置成主机

DataNode

  1. 保存具体的block数据
  2. 负责数据的读写操作和复制操作
  3. DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息
  4. DataNode之间会进行通信,复制数据块,保证数据的冗余性
  5. DataNode会定时发送心跳到NameNode。如果一段时间内NameNode没有收到DataNode的心跳消息,则认为其失效。此时NameNode就会将该节点的数据(从该节点的复制节点中获取)复制到另外的DataNode中1.3 MapReduceMapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题。

MapReduce分成了两个部分:

  1. 映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping。
  2. 化简(Reducing)遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。

你向MapReduce框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map任务,然后分配到不同的节点上去执行,

每一个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce任务的输入数据。

Reduce任务的主要目标就是把前面若干个Map的输出汇总到一起并输出。

MapReduce的伟大之处就在于编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。图中执行的顺序都用数字标记了。

  1. MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。
  2. user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业3或者Reduce作业),worker的数量也是可以由用户指定的。
  3. 被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
  4. 缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。
  5. master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。
  6. reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
  7. 当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码
  8. 所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

2. Spark

2.1 简介

Apache Spark是一种快速的集群计算技术,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,专为快速计算而设计。它基于Hadoop MapReduce,扩展了MapReduce模型,以有效地将其用于更多类型的计算,包括交互式查询和流处理。 Spark的主要特性是它的内存中集群计算,从而不再需要读写HDFS,提高了应用程序的处理速度,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

Spark旨在涵盖各种工作负载,如批处理应用程序,迭代算法,交互式查询和流式处理。除了在相应系统中支持所有这些工作负载之外,它还减少了维护单独工具的管理负担。

对于一个普遍的信念,Spark不是Hadoop的修改版本,并不是真的依赖于Hadoop,因为它有自己的集群管理。 Hadoop只是实现Spark的方法之一。

Spark以两种方式使用Hadoop - 一个是存储,另一个是处理。由于Spark具有自己的集群管理计算,因此它仅使用Hadoop进行存储。

Spark架构图Spark架构图

基本概念

  • Application: 用户自己写的 Spark 应用程序,批处理作业的集合。Application 的 main 方法为应用程序的入口,用户通过 Spark 的 API,定义了 RDD 和对 RDD 的操作
  • SparkContext: Spark 最重要的 API,用户逻辑与 Spark 集群主要的交互接口,它会和 Cluster Master 交互,包括向它申请计算资源等(在Spark2.0新标准中使用SparkSession,其实质为SQLContext与HiveContext,内部封装SparkContext)
  • Driver 和 Executor:Spark 在执行每个 Application 的过程中会启动 Driver 和 Executor 两种 JVM 进程。Driver 进程为主控进程,负责执行用户 Application 中的 main 方法,提交 Job,并将 Job 转化为 Task,在各个 Executor 进程间协调 Task 的调度。运行在Worker上 的 Executor 进程负责执行 Task,并将结果返回给 Driver,同时为需要缓存的 RDD 提供存储功能 ###2.2 弹性分布式数据集(RDD) 弹性分布式数据集(RDD)是 Spark 框架中的核心概念。可以将 RDD 视作数据库中的一张表。其中可以保存任何类型的数据。Spark 将数据存储在不同分区上的 RDD 之中。

RDD 可以帮助重新安排计算并优化数据处理过程。

此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。

RDD 是不可变的。你可以用变换(Transformation)修改 RDD,但是这个变换所返回的是一个全新的RDD,而原有的 RDD 仍然保持不变。

RDD 支持两种类型的操作:

  • 变换(Transformation)变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。 变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。
  • 行动(Action)行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。 行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。

DataFrame: 以RDD为基础的分布式数据集,与RDD相同,采用惰性机制,只记录各种转换的逻辑线路图(DAG),支持SQL查询

2.3 架构设计

Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。

与Hadoop MapReduce计算框架相比,Spark所采用的Executor有两个优点:

一是利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型),减少任务的启动开销;

二是Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时,就可以直接读该存储模块里的数据,而不需要读写到HDFS等文件系统里,因而有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能。

2.4 Spark运行基本流程

  1. 当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源;
  2. 资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上;
  3. SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor;
  4. 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。 3. Hadoop Spark 对比
  5. Spark的中间数据放到内存中,对于迭代运算效率更高,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错
  6. Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念
  7. Spark比Hadoop更通用 - Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。 - 这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。 - 不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
  8. 容错性 - 在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
  9. 可用性 - Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。
  • 两者都是用MapReduce模型来进行并行计算:
代码语言:txt复制
- hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束
- hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系
- spark用户提交的任务成为application,一个application对应一个SparkContext,app中存在多个job,每触发一次action操作就会产生一个job
- 这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算

4. Ref

  1. Google FS, MapReduce, BigTable
  2. https://www.usenix.org/legacy/event/hotcloud10/tech/full_papers/Zaharia.pdf (Spark Paper)
  3. https://blog.csdn.net/bitcarmanlee/article/details/53048289
  4. http://www.cnblogs.com/jeakeven/p/5354764.html
  5. https://www.w3cschool.cn/hadoop/fgr61jyf.html
  6. https://www.w3cschool.cn/spark_sql/spark_introduction.html
  7. https://blog.csdn.net/yuan_xw/article/details/50532368
  8. http://dblab.xmu.edu.cn/blog/1711-2/

0 人点赞