三大组件HDFS、MapReduce、Yarn框架结构的深入解析式地详细学习【建议收藏!】

2021-09-16 17:22:54 浏览数 (1)

前言

我们知道目前Hadoop主要包括有三大组件,分别是:分布存储框架(HDFS)、分布式计算框架(MapReduce)、以及负责计算资源调度管理的平台(Yarn),那么今天我们就来解析式的深入学习了解这三大组件。

Hadoop启动脚本详细介绍

代码语言:javascript复制
第一种:全部启动集群所有进程
启动:sbin/start-all.sh
停止:sbin/stop-all.sh

第二种:单独启动hdfs【web端口50070】和yarn【web端口8088】的相关进程
启动:sbin/start-dfs.sh  sbin/start-yarn.sh
停止:sbin/stop-dfs.sh  sbin/stop-yarn.sh
每次重新启动集群的时候使用

第三种:单独启动某一个进程
启动hdfs:sbin/hadoop-daemon.sh start (namenode | datanode)
停止hdfs:sbin/hadoop-daemon.sh stop (namenode | datanode)
启动yarn:sbin/yarn-daemon.sh start (resourcemanager | nodemanager)
停止yarn:sbin/yarn-daemon.sh stop(resourcemanager | nodemanager)
用于当某个进程启动失败或者异常down掉的时候,重启进程

一、HDFS框架

1、HDFS概述

HDFS(Hadooop Distributed File System)是Hadoop项目的核心子项目,是Hadoop主要应用的一个分布式文件管理系统;其实,在Hadoop中有一个综合性的文件系统抽象,而该抽象中提供了文件系统实现的各种接口,而,HDFS只是这个抽象文件系统的一个实例。HDFS是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。具有很好的通透性和容错性,注意:分布式文件管理系统很多,HDFS只是其中一种,HDFS不合适小文件。

总结HDFS的特点如下

  • 可以处理超大文件
  • 可以流式地访问数据(一次写入,多次读取)
  • 可以运行于廉价的商品机器集群上
  • 不适合低延迟数据的访问
  • 无法高效存储大量小文件
  • 不支持多用户写入及任意修改文件

那么,我们可以先看一下hadoop的文件系统…

2、Hadoop文件系统

在Hadoop中整合了很多的文件系统,这些众多的文件系统中,Hadoop提供了一个高层的文件系统的抽象类org.apache.hadoop.fs.FileSystem,这个抽象类向我们展示了一个分布式文件系统,并且有一些具体的实现。在Hadoop提供的许多文件系统的接口中,用户可以使用URI方案来选取合适的文件系统来进行交互;而Hadoop中文件系统的接口是使用Java来编写的,不同文件系统之间的交互实际上是通过Java API来进行调节的。例如我们可以使用 hadoop fs -ls file:///shell命令来查看本地文件系统目录,该shell命令其实就是一个Java应用。

3、HDFS的体系结构

如下图,HDFS是采用主从式架构对文件系统进行管理,一个HDFS集群是由一个NameNode和若干个DataNode组成的。NameNode是一个中心服务器,负责管理文件系统的名字空间(Namespace)以及客户端对文件的访问。集群中的DataNode一般是一个节点运行一个DataNode进程,负责管理它所在节点上的存储。

(1)、RPC远程过程调用协议 RPC(远程过程调用协议),它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。RPC采用客户机(client)/服务器(server)模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。 Hadoop的整个体系结构就是构建在RPC之上的(见org.apache.hadoop.ipc)

(2)、HDFS的相关概念

  1. Block(块) 在HDFS中的块是一个抽象的概念,它比操作系统中的文件块要大得多。Hadoop 1.x版本默认的block块的大小为64MB,而在Hadoop 2.x版本中默认块大小为128MB, 在HDFS分布式文件系统中的文件也被分成块进行存储,它是文件存储处理的逻辑单元。在分布式文件系统中使用,抽象块有很多好处,例如,①:可以存储任意大的文件而又不会受到网络中任一单个节点磁盘大小的限制。②:使用抽象块作为操作的单元可以简化存储子系统。对于故障出现频繁,种类繁多的分布式系统来说,简化是非常重要的,如,HDFS分布式存储系统中的块的大小固定,这样就是简化了存储系统的管理,尤其是元数据信息可以和文件快内容分开存储。③:块更有利于分布式文件系统中复制容错的实现。HDFS中,为了处理节点故障,默认将文件块副本数设定为3份,分别存储在集群中的不同节点上。如果有一个块损坏时,系统就会通过NameNode获取元数据信息,并在另外一个节点上读取一个副本且进行存储,对于用户来说这个过程都是透明的。而这里的文件块的副本冗余量是可以通过文件进行配置的。我们可以通过shell命令hadoop fsck / -files -blocks来获取文件和块的信息。Block块的大小可以自己指定,但是,块太小:寻址时间占比过高。块太大:Map任务数太少,作业执行速度变慢。它是最大的一个单位
  2. Packet packet是第二大的单位,它是client端向DataNode,或DataNode的PipLine之间传数据的基本单位,默认大小为64KB。
  3. Chunk chunk是最小的单位,它是client向DataNode,或DataNode的PipLine之间进行数据校验的基本单位,默认大小为512Byte,因为用作校验,故每个chunk需要带有4Byte的校验位。所以实际每个chunk写入packet的大小为516Byte。由此可见真实数据与校验值数据的比值约为128 : 1。(即64*1024 / 512)

在client端向DataNode传数据的时候,HDFSOutputStream会有一个chunk buff,在写满一个chunk后,会计算校验和并写入当前的chunk。之后再把带有校验和的chunk写入packet,当一个packet写满后,packet会进入dataQueue队列,其他的DataNode就是从这个dataQueue获取client端上传的数据并存储的。同时一个DataNode成功存储一个packet后之后会返回一个ack packet,放入ack Queue中。

  1. NameNode在HDFS体系结构中主要有两类节点NameNode和DataNode,它们分别承担Master和Worker的任务。NameNode是整个文件系统的管理节点。NameNode管理文件系统的命名空间,它维护着整个文件系统的文件目录树以及这些文件的索引目录,文件/目录的元信息和每个文件对应的数据块列表。这些信息是以两种形式存储在本地文件系统中,一种是命名空间镜像(Namespace images),一种是编辑日志(Edit log)。NameNode用于接收用户的操作请求,可以获取每个文件的每个块所在的DataNode。注意:这些信息并不是永久保存的,NameNode会在每次系统启动时动态的重建这些信息。运行任务时,客户端会通过NameNode获取元数据信息与DataNode进行交互以访问整个文件系统。

NameNode文件包括(小北设置的文件是在/usr/local/soft/hadoop/hadoop-2.7.6/tmp/dfs/name/current 路径下的):(元数据就是指描述数据的数据)

  • fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
  • edits:操作日志文件,namenode启动后一些新增元信息日志。
  • fstime:保存最近一次checkpoint的时间

(上面文件的保存路径是在hadoop的配置文件中的hdfs-site.xml的dfs.namenode.name.dir属性设置的)

  1. DataNode DataNode是文件系统Worker中的节点,用来执行具体的任务,主要是存储文件块,被客户端和NameNode调用。同时它也会通过心跳(Heartbeat),定时的向NameNode发送所存储的文件块信息。文件块的大小是可以自指定的,在hdfs-site.xml配置文件中的dfs.blocksize属性指定block块的大小,Hadoop 1.x默认64MB,2.x默认128MB;

(3)、secondary namenode

checkpoint操作:该操作就相当于拍快照,其中的属性:

  • fs.checkpoint.period 用于指定两次checkpoint的最大时间间隔,默认3600秒(即一个小时)
  • fs.checkpoint.size 用于规定edits文件的最大值,一旦超过这个值就强制checkpoint,不管是否到达最大时间间隔,默认的大小为64MB。

secondary namenode的工作流程:

  1. secondary通知namenode切换edits文件
  2. secondary通过http从namenode获得fsimage文件和edits文件
  3. secondary将fsimage文件载入内存,然后开始合并edits文件
  4. secondary将新的fsimage文件发回给namenode
  5. namenode用新的fsimage文件替换旧的fsimage文件

4、HDFS 的Trash回收站

HDFS 的Trash回收站和Linux系统(桌面环境)的回收站设计一样的,HDFS会为每一个用户创建一个回收站目录:/user/用户名/.Trash/,每一个被用户通过Shell删除的文件或者目录,(fs.trash.interval是在指在这个回收周期之内),文件实际上是被移动到trash的这个目录下面,而不是马上把数据删除掉。等到回收周期真正到了以后,HDFS才会将数据真正的删除。默认的单位是分钟,1440分钟=60*24,正好是一天。详细配置:在每个节点(不仅仅是主节点)上添加配置 core-site.xml,增加如下内容

代码语言:javascript复制
<property>
   <name>fs.trash.interval</name>
   <value>1440</value>
</property>

注意:如果删除的文件过大,超过了回收站的大小的话,会提示删除失败,此时,需要指定参数 -skipTrash才能继续删除

5、HDFS的shell操作命令

调用文件系统(FS)Shell命令应使用 bin/hdfs dfs -xxx 的形式, hadoop fs 与 hdfs dfs 是等价使用的

代码语言:javascript复制
   
常用的命令如下: 
-ls   
查看hdfs上目录,如 hdfs dfs -ls /
 
-put       
将本地文件上传到hdfs,如hdfs dfs -put  本地文件路径  hdfs路径

-get 
将hdfs文件下载到本地,如 hdfs dfs -get hdfs文件路径  本地文件路径
 
-mkdir 
在hdfs 上创建文件夹,如hdfs dfs -mkdir /test

-cp 
将hdfs文件或目录复制 如 hdfs dfs -cp /test.txt /a/

-cat     
查看hdfs上文件内容  如hdfs dfs -cat /test.txt

详细的shell命令介绍使用

代码语言:javascript复制
 
1. cat  : 将路径指定文件的内容输出到stdout 。
 使用方法格式:hadoop fs -cat URI [URI …] 
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -cat hdfs://host1:port1/file1 hdfs://host2:port2/file2
hadoop fs -cat file:///file3 /user/hadoop/file4
 
2. chgrp :改变文件所属的组。
 使用方法:hadoop fs -chgrp [-R] GROUP URI [URI …] 
 使用-R 将使改变在目录结构下递归进行。
 命令的使用者必须是,文件的所有者或者超级用户。

3.chmod :改变文件的权限。
 使用方法:hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI …]
 使用-R 将使改变在目录结构下递归进行。
 命令的使用者必须是文件的所有者或者超级用户。
 
4.chown :改变文件的拥有者。
 使用方法:hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]
 使用-R 将使改变在目录结构下递归进行。
 命令的使用者必须是 超级用户。
 
5.copyFromLocal
 使用方法:hadoop fs -copyFromLocal <localsrc> URI
 除了限定源路径是一个本地文件外,其它和put 命令相似。
 
6.copyToLocal
 使用方法:hadoop fs -copyToLocal [-ignorecrc] [-crc] URI <localdst>
 除了限定目标路径是一个本地文件外,其它和get 命令类似。
 
7.cp:将文件从源路径复制到目标路径
 使用方法:hadoop fs -cp URI [URI …] <dest>
 这个命令允许有多个源路径,此时目标路径必须是一个目录。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2
hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

8. du
 使用方法:hadoop fs -du URI [URI …]
 返回值:成功返回0,失败返回-1。
 显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。
使用示例:
hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://host:port/user/hadoop/dir1

9.dus
 使用方法:hadoop fs -dus <args>
 显示文件的大小。
 
10.expunge :清空回收站。
 使用方法:hadoop fs -expunge
 
11.get
 使用方法:hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>
 复制文件到本地文件系统。
 可用-ignorecrc 选项复制CRC校验失败的文件。
 使用-crc 选项复制文件以及CRC信息。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

12.getmerge
 使用方法:hadoop fs -getmerge <src> <localdst> [addnl]
 接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。
 addnl 是可选的,用于指定在每个文件结尾添加一个换行符。

13. ls
 使用方法:hadoop fs -ls <args>
 返回值:成功返回0,失败返回-1。
 如果是文件,则按照如下格式返回文件信息:
 文件名 <副本数> 文件大小 修改日期 修改时间 权限 用户ID 组ID

 如果是目录,则返回它直接子文件的一个列表,就像在Unix中一样。
 目录返回列表的信息如下:
  目录名 <dir> 修改日期 修改时间 权限 用户ID 组ID
使用示例:
hadoop fs -ls /user/hadoop/file1 /user/hadoop/file2 hdfs://host:port/user/hadoop/dir1 /nonexistentfile

14. lsr
 使用方法:hadoop fs -lsr <args>
 ls 命令的递归版本。类似于Unix中的ls -R 。

15. mkdir
 使用方法:hadoop fs -mkdir <paths>
 返回值: 成功返回0,失败返回-1。
 接受路径指定的uri作为参数,创建这些目录。
 其行为类似于Unix的mkdir -p,它会创建路径中的各级父目录。
使用示例:
hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
hadoop fs -mkdir hdfs://host1:port1/user/hadoop/dir hdfs://host2:port2/user/hadoop/dir

16. movefromLocal
 使用方法:dfs -moveFromLocal <src> <dst>
 输出一个”not implemented“信息。

17. mv :将文件从源路径移动到目标路径。
 使用方法:hadoop fs -mv URI [URI …] <dest>
 这个命令允许有多个源路径,此时目标路径必须是一个目录。
 不允许在不同的文件系统间移动文件。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
hadoop fs -mv hdfs://host:port/file1 hdfs://host:port/file2 hdfs://host:port/file3 hdfs://host:port/dir1

18. put
 使用方法:hadoop fs -put <localsrc> ... <dst>
 从本地文件系统中复制单个或多个源路径到目标文件系统。
 也支持从标准输入中读取输入写入目标文件系统。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile
从标准输入中读取输入。

19. rm
 使用方法:hadoop fs -rm URI [URI …]
 删除指定的文件。只删除非空目录和文件。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -rm hdfs://host:port/file /user/hadoop/emptydir

20.rmr
 使用方法:hadoop fs -rmr URI [URI …]
 delete的递归版本。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -rmr /user/hadoop/dir
hadoop fs -rmr hdfs://host:port/user/hadoop/dir

21.setrep
 使用方法:hadoop fs -setrep [-R] <path>
 返回值:成功返回0,失败返回-1。
 改变一个文件的副本系数。
 -R选项用于递归改变目录下所有文件的副本系数。
使用示例:
hadoop fs -setrep -w 3 -R /user/hadoop/dir1

22.stat
 使用方法:hadoop fs -stat URI [URI …]
 返回指定路径的统计信息。
 返回值:成功返回0,失败返回-1。
使用示例:
hadoop fs -stat path

23. tail
 使用方法:hadoop fs -tail [-f] URI
 返回值:成功返回0,失败返回-1。
 将文件尾部1K字节的内容输出到stdout。
  支持-f选项,行为和Unix中一致。
使用示例:
hadoop fs -tail pathname

24.test
 使用方法:hadoop fs -test -[ezd] URI
 选项:
 -e 检查文件是否存在。如果存在则返回0。
 -z 检查文件是否是0字节。如果是则返回0。
 -d 如果路径是个目录,则返回1,否则返回0。
使用示例:
hadoop fs -test -e filename

25.text
 使用方法:hadoop fs -text <src>
 将源文件输出为文本格式。
 允许的格式是zip和TextRecordInputStream。

26. touchz
 使用方法:hadoop fs -touchz URI [URI …]
 创建一个0字节的空文件。
使用示例:
hadoop -touchz pathname

6、HDFS的读写流程

(1)读文件流程

首先,client访问NameNode,查询元数据信息,获得这个文件的数据

  • 1.首先调用FileSystem对象的open方法,其实是一个DistributedFileSystem的实例
  • 2.DistributedFileSystem通过rpc获得文件的第一个block的locations,同一block按照副本数会返回多个locations,这些locations按照hadoop拓扑结构排序,距离客户端近的排在前面.
  • 3.前两步会返回一个FSDataInputStream对象,该对象会被封装成DFSInputStream对象,DFSInputStream可以方便的管理datanode和namenode数据流。客户端调用read方法,DFSInputStream最会找出离客户端最近的datanode并连接。
  • 4.数据从datanode源源不断的流向客户端。(通过packet形式)
  • 5.如果第一块的数据读完了,就会关闭指向第一块的datanode连接,接着读取下一块。这些操作对客户端来说是透明的,客户端的角度看来只是读一个持续不断的流。
  • 6.如果第一批block都读完了,DFSInputStream就会去namenode拿下一批blocks的location,然后继续读,如果所有的块都读完,这时就会关闭掉所有的流。
  • 7.如果在读数据的时候,DFSInputStream和datanode的通讯发生异常,就会尝试正在读的block的排第二近的datanode,并且会记录哪个datanode发生错误,剩余的blocks读的时候就会直接跳过该datanode。DFSInputStream也会检查block数据校验和,如果发现一个坏的block,就会先报告到namenode节点,然后DFSInputStream在其他的datanode上读该block的镜像
  • 8.该设计的方向就是客户端直接连接datanode来检索数据并且namenode来负责为每一个block提供最优的datanode,namenode仅仅处理block location的请求,这些信息都加载在namenode的内存中,hdfs通过datanode集群可以承受大量客户端的并发访问。
(2)写文件流程

客户端向NameNode发出写文件请求。

检查是否已存在文件、检查权限。若通过检查,直接先将操作写入EditLog,并返回输出流对象。

client端按128MB的块切分文件。

client将NameNode返回的分配的可写的DataNode列表和Data数据一同发送给最近的第一个DataNode节点,此后client端和NameNode分配的多个DataNode构成pipeline管道,client端向输出流对象中写数据。client每向第一个DataNode写入一个packet,这个packet便会直接在pipeline里传给第二个、第三个…DataNode。

每个DataNode写完一个块后,会返回确认信息。

写完数据,关闭输输出流。

发送完成信号给NameNode。注意:发送完成信号的时机取决于集群是强一致性还是最终一致性,强一致性则需要所有DataNode写完后才向NameNode汇报。最终一致性则其中任意一个DataNode写完后就能单独向NameNode汇报,HDFS一般情况下都是强调强一致性

代码语言:javascript复制
 - 1.客户端通过调用DistributedFileSystem的create方法创建新文件
  • 2.DistributedFileSystem通过RPC调用namenode去创建一个没有blocks关联的新文件,创建前,namenode会做各种校验,比如文件是否存在,客户端有无权限去创建等。如果校验通过,namenode就会记录下新文件,否则就会抛出IO异常.
  • 3.前两步结束后会返回FSDataOutputStream的对象,与读文件的时候相似,FSDataOutputStream被封装成DFSOutputStream.DFSOutputStream可以协调namenode和datanode。客户端开始写数据到DFSOutputStream,DFSOutputStream会把数据切成一个个小packet,然后排成队列data quene。
  • 4.DataStreamer会去处理接受data queue,他先问询namenode这个新的block最适合存储的在哪几个datanode里,比如副本数是3,那么就找到3个最适合的datanode,把他们排成一个pipeline.DataStreamer把packet按队列输出到管道的第一个datanode中,第一个datanode又把packet输出到第二个datanode中,以此类推。
  • 5.DFSOutputStream还有一个对列叫ack queue,也是由packet组成,等待datanode的收到响应,当pipeline中的所有datanode都表示已经收到的时候,这时akc queue才会把对应的packet包移除掉。如果在写的过程中某个datanode发生错误,会采取以下几步:1) pipeline被关闭掉;2)为了防止丢包 ack queue里的packet会同步到data queue里;3)把产生错误的datanode上当前在写但未完成的block删掉;4)block剩下的部分被写到剩下的两个正常的datanode中;5)namenode找到另外的datanode去创建这个块的复制。当然,这些操作对客户端来说是无感知的。
  • 6.客户端完成写数据后调用close方法关闭写入流
  • 7.DataStreamer把剩余得包都刷到pipeline里然后等待ack信息,收到最后一个ack后,通知datanode把文件标示为已完成。

7、HDFS常见问题

(1)hdfs文件无法操作:一般是因为处于安全模式下
代码语言:javascript复制
离开安全模式:hdfs dfsadmin -safemode leave
进入安全模式:hdfs dfsadmin -safemode enter
查看安全模式:hdfs dfsadmin -safemode get

二、MapReduce原理深入理解

1、MapReduce概述及原理

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据(在这先把reduce理解为一个单独的聚合程序即可)。MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参和返回值都是<key、value>,使用的时候一定要注意构造<k,v>。

2、MR的执行过程

MapReduce过程分为map阶段和reduce阶段(主要是对map任务发送来的数据进行处理),其实,还有一个shuffle阶段,maprudece过程中流通的数据格式都是key-value键值对格式的。

接下来,我们详细的了解下MapReduce的过程,在map阶段,主要任务是处理从HDFS中输入的文件,在输入时会使用InputFormat类的子类(TextInputFormat)把输入的文件(夹)划分为很多切片(InputSplit),默认HDFS的每一个block块对应着一个切片,每一个切片默认大小为128MB,每一个切片会产生一个map任务。然后当map任务处理完后(就会进入到shuffle(中文译为“洗牌”)阶段,从map端的输出到reduce端的输入这一过程称为shuffle过程,该过程主要是负责map端的数据能够完整的发送到reduce端进行处理),就会将这些数据刷写到环形缓冲区(Buffer in mermory)中,环形缓冲区的默认大小位100MB,而当这些数据刷写到80%的时候,就会开始溢写到磁盘中去,那么溢写的过程中,会对数据进行分区(partition)和排序(sort)处理,然后每个map任务可能会产生多个(小)文件(这些小文件都采用了快速排序处理过),我们知道最后reduce任务会从map任务中拉取数据,对这些(小)文件进行拉取时会需要很多的网络IO以及磁盘IO的资源,那么,为了减少网络IO和磁盘IO的次数,我们会将多个(小)文件合并为一个(大)文件,该过程就是merge操作,使用hash partitional分区原理来分区,采用归并排序对分区中的数据排序,并且会根据map端中输入的key的不同去计算hash值,再去对指定的reduce的个数进行取余,reduce的个数是指提前设定好的,几个reduce(个数)就会产生几个分区,然而,相同的key的数据会进入到同一个reduce任务中,一个reduce任务中可以处理不同key的数据,因为不同数据key的hash值对reduce个数进行取余,最后得到的结果可能一样的,所以一个reduce任务中也有可能会有不同的key,最后每一个map任务会生成一个(大)文件,而这个(大)文件是经过分区和排序得到的,之后,会通过HTTP将每一个输出文件的特定分区的数据拉取到reduce任务中,等reduce任务阶段处理完后会将最终的结果写入到磁盘(也就是HDFS)中。

3、InputSplit

在执行mapreduce之前,原始数据被分割成若干split(切片),每个split作为一个map任务的输入。但是,当Hadoop处理很多小文件时(文件大小小于hdfs block大小),此时FileInputFormat不会对小文件进行划分,那么每一个小文件都会被当做一个split并分配一个map任务,这样就会有大量的map task运行,会导致效率非常底下。例如:一个1G的文件,会被划分成8个128MB的split,并分配8个map任务处理,而10000个100kb的文件会被10000个map任务处理,效率就会很低下。

Map任务的数量:

  • 一个InputSplit对应一个Map task
  • InputSplit的大小是由Math.max(minSize, Math.min(maxSize,blockSize))决定
  • 单节点建议运行10—100个map task
  • map task执行时长不建议低于1分钟,否则导致效率低

4、RecordReader类

每一个InputSplit都有一个RecordReader类,其作用是把InputSplit中的数据解析成Record,即<k1,v1>。在TextInputFormat中的RecordReader是LineRecordReader,它会将每一行解析成一个<k1,v1>。其中,k1表示偏移量,而v1则表示行文本内容

5、MapReduce默认输入处理类

InputFormat抽象类,只是定义了两个方法。

FileInputFormat

FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat处理作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。

TextInputFormat extInputFormat是默认的处理类,主要用来处理普通文本文件,文件中每一行作为一个记录,它将每一行在文件中的起始偏移量作为key,每一行的内容作为value默认以n或回车键作为一行记录

6、序列化

序列化 (Serialization)指将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。

序列化的目的就是为了跨进程传递格式化数据: 当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。把对象转换为字节序列的过程称为对象的序列化。把字节序列恢复为对象的过程称为对象的反序列化

7、MapReduce优化总结

  • 1,通过修改map的切片大小控制map数据量(尽量和block大小保持一致)注意:并不是map越多越好,根据集群资源,set mapred.max.split.size=256000000
  • 2,合并小文件。因为一个文件会至少生成一个map
  • 3,避免数据倾斜
  • 4,combine操作
  • 5,mapjoin操作
  • 6,适当备份,因为备份多可以本地化生成map任务

三、Yarn核心组件功能

YARN Client

YARN Client提交Application到RM,它会首先创建一个Application上下文对象,并设置AM必需的资源请求信息,然后提交到RM。YARN Client也可以与RM通信,获取到一个已经提交并运行的Application的状态信息等。

ResourceManager(RM)

RM是YARN集群的Master,负责管理整个集群的资源和资源分配。RM作为集群资源的管理和调度的角色,如果存在单点故障,则整个集群的资源都无法使用。在2.4.0版本才新增了RM HA的特性,这样就增加了RM的可用性。

NodeManager(NM)

NM是YARN集群的Slave,是集群中实际拥有实际资源的工作节点。我们提交Job以后,会将组成Job的多个Task调度到对应的NM上进行执行。Hadoop集群中,为了获得分布式计算中的Locality特性,会将DN和NM在同一个节点上运行,这样对应的HDFS上的Block可能就在本地,而无需在网络间进行数据的传输。

Container

Container是YARN集群中资源的抽象,将NM上的资源进行量化,根据需要组装成一个个Container,然后服务于已授权资源的计算任务。计算任务在完成计算后,系统会回收资源,以供后续计算任务申请使用。Container包含两种资源:内存和CPU,后续Hadoop版本可能会增加硬盘、网络等资源。

ApplicationMaster(AM)

AM主要管理和监控部署在YARN集群上的Application,以MapReduce为例,MapReduce Application是一个用来处理MapReduce计算的服务框架程序,为用户编写的MapReduce程序提供运行时支持。通常我们在编写的一个MapReduce程序可能包含多个Map Task或Reduce Task,而各个Task的运行管理与监控都是由这个MapReduceApplication来负责,比如运行Task的资源申请,由AM向RM申请;启动/停止NM上某Task的对应的Container,由AM向NM请求来完成。

资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去 GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下载 https://gitee.com/li_hey_hey/dashboard/projects

0 人点赞