Hadoop

2023-10-17 15:43:06 浏览数 (1)

1 Hadoop常用端口号

hadoop2.x

Hadoop3.x

访问HDFS端口

50070

9870

访问MR执行情况端口

8088

8088

历史服务器

19888

19888

客户端访问集群端口

9000

8020

2 Hadoop配置文件

  hadoop2.x core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml slaves

  hadoop3.x core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml workers

3 HDFS读流程和写流程

3.1 读

  1)客户端向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。

  2)挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。

  3)datanode开始传输数据给客户端(从磁盘里面读取数据放入流,以packet为单位来做校验)。

  4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。(后面的block块Append到前面的block块合成最终文件

3.2 写

  1)客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。

  2)namenode返回是否可以上传。

  3)客户端请求第一个 block上传到哪几个datanode服务器上。(文件先经过切分处理

  4)namenode返回3个datanode节点,分别为dn1、dn2、dn3。(遵循机架感知原则把副本分别放在不同机架,甚至不同数据中心)

  5)客户端请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成

  6)dn1、dn2、dn3逐级应答客户端

  7)客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答,当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)。(数据传输完成后Datanode会向Client通信,同时向Namenode报告存储完成

4 secondary namenode工作机制

4.1 secondary namenode工作机制

1)第一阶段:namenode启动

  (1)第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

  (2)客户端对元数据进行增删改的请求

  (3)namenode记录操作日志,更新滚动日志

  (4)namenode在内存中对数据进行增删改查

2)第二阶段:Secondary NameNode工作

  (1)Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果。

  (2)Secondary NameNode请求执行checkpoint。

   (3)namenode滚动正在写的edits日志

   (4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode

   (5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

   (6)生成新的镜像文件fsimage.chkpoint

   (7)拷贝fsimage.chkpoint到namenode

   (8)namenode将fsimage.chkpoint重新命名成fsimage

4.2 NameNode 在启动的时候会做哪些操作

  NameNode 数据存储在内存和本地磁盘,本地磁盘数据存储在 fsimage 镜像文件和edits 编辑日志文件。

4.2.1 首次启动 NameNode:

  1. 格式化文件系统,为了生成 fsimage 镜像文件;

  2. 启动 NameNode:

    • 读取 fsimage 文件,将文件内容加载进内存

    • 等待 DataNade 注册与发送 block report

  3. 启动 DataNode:

    • 向 NameNode 注册

    • 发送 block report

    • 检查 fsimage 中记录的块的数量和 block report 中的块的总数是否相同

  4. 对文件系统进行操作(创建目录,上传文件,删除文件等):

  此时内存中已经有文件系统改变的信息,但是磁盘中没有文件系统改变的信息,此时会将这些改变信息写入 edits 文件中,edits 文件中存储的是文件系统元数据改变的信息。

4.2.2 第二次启动 NameNode:

  1. 读取 fsimage 和 edits 文件;

  2. 将 fsimage 和 edits 文件合并成新的 fsimage 文件;

  3. 创建新的 edits 文件,内容开始为空;

  4. 启动 DataNode。

5 NameNode与SecondaryNameNode 的区别与联系?

5.1 区别

  (1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。

  (2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。

5.2 联系:

   (1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。

  (2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。

6 hadoop节点动态上线下线怎么操作?

6.1 节点上线操作

  当要新上线数据节点的时候,需要把数据节点的名字追加在 dfs.hosts 文件中

  (1)关闭新增节点的防火墙   (2)在 NameNode 节点的 hosts 文件中加入新增数据节点的 hostname   (3)在每个新增数据节点的 hosts 文件中加入 NameNode 的 hostname   (4)在 NameNode 节点上增加新增节点的 SSH 免密码登录的操作   (5)在 NameNode 节点上的 dfs.hosts 中追加上新增节点的 hostname,   (6)在其他节点上执行刷新操作:hdfs dfsadmin -refreshNodes   (7)在 NameNode 节点上,更改 slaves 文件,将要上线的数据节点 hostname 追加到 slaves 文件中   (8)启动 DataNode 节点   (9)查看 NameNode 的监控页面看是否有新增加的节点

6.2 节点下线操作

  (1)修改/conf/hdfs-site.xml 文件   (2)确定需要下线的机器,dfs.osts.exclude 文件中配置好需要下架的机器,这个是阻 止下架的机器去连接 NameNode。   (3)配置完成之后进行配置的刷新操作./bin/hadoop dfsadmin -refreshNodes,这个操作的作用是在后台进行 block 块的移动。   (4)当执行三的命令完成之后,需要下架的机器就可以关闭了,可以查看现在集群上连接的节点,正在执行 Decommission,会显示:Decommission Status : Decommission in progress 执行完毕后,会显示:Decommission Status : Decommissioned   (5)机器下线完毕,将他们从excludes 文件中移除。

7 HAnamenode 是如何工作的?

7.1 ZKFailoverController主要职责

  1)健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。

  2)会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。

  3)当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。

  4)master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态

7.2 NameNode HA

  1. 元数据信息同步在 HA 方案中采用的是“共享存储”。每次写文件时,需要将日志同步写入共享存储,这个步骤成功才能认定写文件成功。然后备份节点定期从共享存储同步日志,以便进行主备切换

  NameNode 共享存储方案有很多,比如 Linux HA, VMware FT, QJM等,目前社区已经把由 Clouderea 公司实现的基于 QJM(Quorum Journal Manager)的方案合并到 HDFS 的trunk 之中并且作为默认的共享存储实现

  基于QJM 的共享存储系统主要用于保存 EditLog,并不保存 FSImage 文件。FSImage文件还是在 NameNode 的本地磁盘上。

  QJM 共享存储的基本思想来自于 Paxos 算法,采用多个称为 JournalNode 的节点组成的JournalNode 集群来存储 EditLog。每个 JournalNode 保存同样的 EditLog 副本。每次 NameNode 写EditLog 的时候,除了向本地磁盘写入 EditLog 之外,也会并行地向 JournalNode 集群之中的每一个 JournalNode 发送写请求,只要大多数的JournalNode 节点返回成功就认为向 JournalNode 集群写入 EditLog 成功。如果有2N 1 台 JournalNode,那么根据大多数的原则,最多可以容忍有 N 台

  2. 监控 NameNode 状态采用 zookeeper,两个 NameNode 节点的状态存放在zookeeper 中,另外两个 NameNode 节点分别有一个进程监控程序,实施读取 zookeeper 中有 NameNode 的状态,来判断当前的 NameNode 是不是已经 down 机。如果 Standby 的 NameNode 节点的 ZKFC 发现主节点已经挂掉,那么就会强制给原本的 Active NameNode 节点发送强制关闭请求,之后将备用的 NameNode 设置为 Active

7.3 7. 在 NameNode HA 中,会出现脑裂问题吗?怎么解决脑裂

  脑裂对于 NameNode 这类对数据一致性要求非常高的系统来说是灾难性的,数据会发生错乱且无法恢复。zookeeper 社区对这种问题的解决方法叫做 fencing,中文翻译为隔离,也就是想办法把旧的 Active NameNode 隔离起来,使它不能正常对外提供服务。

  在进行 fencing 的时候,会执行以下的操作:

  (1) 首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的TransitionToStandby 方法,看能不能把它转换为 Standby 状态

  (2) 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施,Hadoop 目前主要提供两种隔离措施,通常会选择 sshfence:

    ① sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死;

    ② shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离

8 hadoop2.x Federation

  多namespace的方式可以直接减轻单一NameNode的压力。

  NameNode共享集群中所有的DataNode,它们还是在同一个集群内.

HDFS Federation方案的优势:

  第一点,命名空间的扩展。因为随着集群使用时间的加长,HDFS上存放的数据也将会越来越多。这个时候如果还是将所有的数据都往一个NameNode上存放,这个文件系统会显得非常的庞大。这时候我们可以进行横向扩展,把一些大的目录分离出去.使得每个NameNode下的数据看起来更加的精简。

  第二点,性能的提升.这个也很好理解。当NameNode所持有的数据量达到了一个非常大规模的量级的时候(比如超过1亿个文件),这个时候NameNode的处理效率可能就会有影响,它可能比较容易的会陷入一个繁忙的状态。而整个集群将会受限于一个单点NameNode的处理效率,从而影响集群整体的吞吐量。这个时候多NameNode机制显然可以减轻很多这部分的压力。

  第三点,资源的隔离。这一点考虑的就比较深了。通过多个命名空间,我们可以将关键数据文件目录移到不同的NameNode上,以此不让这些关键数据的读写操作受到其他普通文件读写操作的影响。也就是说这些NameNode将会只处理特定的关键的任务所发来的请求,而屏蔽了其他普通任务的文件读写请求,以此做到了资源的隔离。千万不要小看这一点,当你发现NameNode正在处理某个不良任务的大规模的请求操作导致响应速度极慢时,你一定会非常的懊恼。

9 TextInputFormat和KeyValueInputFormat的区别是什么?

  1)相同点:

  TextInputformat和KeyValueTextInputFormat都继承了FileInputFormat类,都是每一行作为一个记录;

  2)区别:

  TextInputformat将每一行在文件中的起始偏移量作为 key,每一行的内容作为value。默认以n或回车键作为一行记录。

  KeyValueTextInputFormat 适合处理输入数据的每一行是两列,并用 tab 分离的形式。

10 FileInputFormat源码解析(input.getSplits(job))

  (1)找到你数据存储的目录。

  (2)开始遍历处理(规划切片)目录下的每一个文件

  (3)遍历第一个文件ss.txt

     a)获取文件大小fs.sizeOf(ss.txt);

     b)计算切片大小       computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

    c)默认情况下,切片大小=blocksize

     d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片

     e)将切片信息写到一个切片规划文件中

     f)整个切片的核心过程在getSplit()方法中完成。

    g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。

    h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。

   (4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数

12 job的map和reduce的数量?

12.1 map数量

   splitSize=max{minSize,min{maxSize,blockSize}}

map数量由处理的数据分成的block数量决定default_num = total_size / split_size;

12.2 reduce数量

reduce的数量job.setNumReduceTasks(x);x 为reduce的数量,不设置的话默认为 1。

13 MapTask工作机制

  (1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

  (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

  (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

  (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作

  溢写阶段详情:

  步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

  步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

  步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

  (5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并(多路归并算法),以确保最终只会生成一个数据文件。

  当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

  在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

  让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

14 ReduceTask工作机制

  (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

   (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并(合并同一个分区号的数据),以防止内存使用过多或磁盘上文件过多。

   (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

   (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

15 mapReduce有几种排序及排序发生的阶段

15.1 排序的分类:

   (1)部分排序:

    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

   (2)全排序:

    如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

    替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

  (3)辅助排序:(GroupingComparator分组)

  Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

(4)二次排序:

  在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

15.2 自定义排序WritableComparable

  bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序

代码语言:javascript复制
@Override
public int compareTo(FlowBean o) {
	// 倒序排列,从大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

15.3 排序发生的阶段:

(1)一个是在map side发生在spill后partition前。

(2)一个是在reduce side发生在copy后 reduce前。

16 mapReduce中combiner的作用是什么,一般使用情景,哪些情况不需要,及和reduce的区别?

  1)Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

  2)Combiner能够应用的前提不能够影响任务的运行结果的局部汇总,适用于求和类,不适用于求平均值,而且Combiner的输出kv应该跟reducer的输入kv类型要对应起来。

  3)Combiner和reducer的区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行

Reducer是接收全局所有Mapper的输出结果

17 Yarn提交任务流程

18 HDFS的数据压缩算法

  Hadoop中常用的压缩算法有bzip2、gzip、lzo、snappy,其中lzo、snappy需要操作系统安装native库才可以支持。企业开发用的比较多的是snappy

19 Hadoop的调度器总结

Apache默认的资源调度器是容量调度器;CDH默认的资源调度器是公平调度器。

(1)FIFO

  先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业。

(2)计算能力调度器Capacity Scheduler

  支持多个队列,每个队列可配置一定的资源量,每个队列采用FIFO调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制

(3)公平调度器Fair Scheduler

  同计算能力调度器类似,支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源,在 Fair 调度器中,我们不需要预先占用一定的系统资源,Fair 调度器会为所有运行的 job 动态的调整系统资源

  在 Fair 调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的 Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是 Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

调度时计算每个任务应该分得的计算资源与实际获得的资源差值,差值越大作业优先级越高

  实际上,Hadoop的调度器远不止以上三种,最近,出现了很多针对新型应用的Hadoop调度器。

(4)适用于异构集群的调度器LATE

  现有的Hadoop调度器都是建立在同构集群的假设前提下,具体假设如下:

    1)集群中各个节点的性能完全一样     2)对于reduce task,它的三个阶段:copy、sort和reduce,用时各占1/3     3)同一job的同类型的task是一批一批完成的,他们用时基本一样。

  现有的Hadoop调度器存在较大缺陷,主要体现在探测落后任务的算法上:如果一个task的进度落后于同类型task进度的20%,则把该task当做落后任务(这种任务决定了job的完成时间,需尽量缩短它的执行时间),从而为它启动一个备份任务(speculative task)。如果集群异构的,对于同一个task,即使是在相同节点上的执行时间也会有较大差别,因而在异构集群中很容易产生大量的备份任务。

  LATE(Longest Approximate Time to End,参考资料[4])调度器从某种程度上解决了现有调度器的问题,它定义三个阈值:SpeculativeCap,系统中最大同时执行的speculative task数目(作者推荐值为总slot数的10%);

  SlowNodeThreshold(作者推荐值为25%):得分(分数计算方法见论文)低于该阈值的node(快节点)上不会启动speculative task;SlowTaskThreshold(作者推荐值为25%):当task进度低于同批同类task的平均进度的SlowTaskThreshold时,会为该task启动speculative task。它的调度策略是:当一个节点出现空闲资源且系统中总的备份任务数小于SpeculativeCap时,(1)如果该节点是慢节点(节点得分高于SlowNodeThreshold),则忽略这个请求。 (2)对当前正在运行的task按估算的剩余完成时间排序 (3)选择剩余完成时间最大且进度低于SlowTaskThreshold的task,为该task启动备份任务。

(5)适用于实时作业的调度器Deadline Scheduler和Constraint-based Scheduler

  这种调度器主要用于有时间限制的作业(Deadline Job),即给作业一个deadline时间,让它在该时间内完成。实际上,这类调度器分为两种,软实时(允许作业有一定的超时)作业调度器和硬实时(作业必须严格按时完成)作业调度器。

  Deadline Scheduler(参考资料[5])主要针对的是软实时作业,该调度器根据作业的运行进度和剩余时间动态调整作业获得的资源量,以便作业尽可能的在deadline时间内完成。

  Constraint-based Scheduler(参考资料[6])主要针对的是硬实时作业,该调度器根据作业的deadline和当前系统中的实时作业运行情况,预测新提交的实时作业能不能在deadline时间内完成,如果不能,则将作业反馈给用户,让他重调整作业的deadline。

20 mapreduce 优化方法

20.1 数据输入

(1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致 mr 运行较慢。

(2)采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景

20.2 map阶段

(1)减少spill次数:通过调整io.sort.mb(默认100MB)及sort.spill.percent(默认80%)参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘 IO。

(2)减少merge次数:通过调整io.sort.factor(默认10)参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。

(3)不影响业务前提下提前在 map 进行Combiner处理,减少 I/O。

20.3 reduce阶段

(1)合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。

(2)设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。

(3)规避使用reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)合理设置reduc端的buffer,默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。

20.4 IO传输

(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。

(2)使用SequenceFile二进制文件

20.5 数据倾斜问题

(1)数据倾斜现象

  数据频率倾斜——某一个区域的数据量要远远大于其他区域。

  数据大小倾斜——部分记录的大小远远大于平均值。

(2)如何收集倾斜数据

  在reduce方法中加入记录map输出键的详细情况的功能。

代码语言:javascript复制
public static final String MAX_VALUES = "skew.maxvalues"; 
private int maxValueThreshold; 
 
@Override
public void configure(JobConf job) { 
     maxValueThreshold = job.getInt(MAX_VALUES, 100); 
} 
@Override
public void reduce(Text key, Iterator<Text> values,
                     OutputCollector<Text, Text> output, 
                     Reporter reporter) throws IOException {
     int i = 0;
     while (values.hasNext()) {
         values.next();
         i  ;
     }

     if (  i > maxValueThreshold) {
         log.info("Received "   i   " values for key "   key);
     }
}

(3)减少数据倾斜的方法

方法1:抽样和范围分区

  可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

方法2:自定义分区

根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。

方法3:Combine

  使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。

方法4:局部聚合加全局聚合

  第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。

  第二次mapreduce,去掉key的随机前缀,进行全局聚合。

  思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。 这个方法进行两次mapreduce,性能稍差。

方法5:增加Reducer,提升并行度

  JobConf.setNumReduceTasks(int)

20.6 常用的调优参数

(1)资源相关参数

(a)以下参数是在用户自己的mr应用程序中配置就可以生效(mapred-default.xml)

配置参数

参数说明

mapreduce.map.memory.mb

一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。

mapreduce.reduce.memory.mb

一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。

mapreduce.map.cpu.vcores

每个Map task可使用的最多cpu core数目,默认值: 1

mapreduce.reduce.cpu.vcores

每个Reduce task可使用的最多cpu core数目,默认值: 1

mapreduce.reduce.shuffle.parallelcopies

每个reduce去map中拿数据的并行数。默认值是5

mapreduce.reduce.shuffle.merge.percent

buffer中的数据达到多少比例开始写入磁盘。默认值0.66

mapreduce.reduce.shuffle.input.buffer.percent

buffer大小占reduce可用内存的比例。默认值0.7

mapreduce.reduce.input.buffer.percent

指定多少比例的内存用来存放buffer中的数据,默认值是0.0

(b)应该在yarn启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

配置参数

参数说明

yarn.scheduler.minimum-allocation-mb 1024

给应用程序container分配的最小内存

yarn.scheduler.maximum-allocation-mb 8192

给应用程序container分配的最大内存

yarn.scheduler.minimum-allocation-vcores 1

每个container申请的最小CPU核数

yarn.scheduler.maximum-allocation-vcores 32

每个container申请的最大CPU核数

yarn.nodemanager.resource.memory-mb 8192

给containers分配的最大物理内存

(c)shuffle性能优化的关键参数,应在yarn启动之前就配置好(mapred-default.xml)

配置参数

参数说明

mapreduce.task.io.sort.mb 100

shuffle的环形缓冲区大小,默认100m

mapreduce.map.sort.spill.percent 0.8

环形缓冲区溢出的阈值,默认80%

(2)容错相关参数(mapreduce性能优化)

配置参数

参数说明

mapreduce.map.maxattempts

每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.reduce.maxattempts

每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.task.timeout

Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

21 HDFS小文件优化方法

1)HDFS小文件弊端:

  HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大导致索引速度变慢。计算方面,每个小文件都会起到一个MapTask,1个MapTask默认内存1G。浪费资源。

2)解决的方式:

(1)Hadoop Archive:

  是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,以减少namenode的内存使用。

(2)CombineFileInputFormat

  CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。

(3)有小文件场景开启JVM重用;如果没有小文件,不要开启JVM重用,因为会一直占用使用到的task卡槽,直到任务完成才释放。

  JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间

代码语言:javascript复制
<property>
    <name>mapreduce.job.jvm.numtasks</name>
    <value>10</value>
    <description>How many tasks to run per jvm,if set to -1 ,there is  no limit</description>
</property>   

22 HDFS的NameNode内存

  1)Hadoop2.x系列,配置NameNode默认2000m

  2)Hadoop3.x系列,配置NameNode内存是动态分配的

  NameNode内存最小值1G,每增加100万个block,增加1G内存。

23 NameNode心跳并发配置

  企业经验:dfs.namenode.handler.count=20×〖log〗_e^(Cluster Size),比如集群规模(DataNode台数)为3台时,此参数设置为21。

24 纠删码原理

  CPU资源换取存储空间

25 异构存储(冷热数据分离)

  期望经常使用的数据存储在固态硬盘或者内存镜像硬盘;不经常使用的历史数据存储在老旧的破旧硬盘。

26 Hadoop宕机

  1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

  2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。

27 HDFS 在读取文件的时候, 如果其中一个块突然损坏了怎么办?

  客户端读取完 DataNode 上的块之后会进行 checksum 验证, 也就是把客户端读取到本地的块与 HDFS 上的原始块进行校验, 如果发现校验结果不一致, 客户端会通知 NameNode, 然后再从下一个拥有该 block 副本的 DataNode 继续读

28 HDFS 在上传文件的时候,如果其中一个 DataNode突然挂掉了怎么办?

  客户端上传文件时与 DataNode 建立 pipeline 管道,管道的正方向是客户端向DataNode 发送的数据包,管道反向是 DataNode 向客户端发送 ack 确认,也就是正确接收到数据包之后发送一个已确认接收到的应答。

  当 DataNode 突然挂掉了,客户端接收不到这个 DataNode 发送的 ack 确认,客户端会通知NameNode,NameNode 检查该块的副本与规定的不符,NameNode会通知DataNode去复制副本,并将挂掉的 DataNode 作下线处理,不再让它参与文件上传与下载

0 人点赞