Hadoop源代码分析【6-10】

2021-01-27 16:35:32 浏览数 (1)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

Hadoop 源代码分析(六)

聊完了 Client 聊 Server ,按惯例,先把类图贴出来。

需要注意的是,这里的 Server 类是个抽象类,唯一抽象的地方,就是

代码语言:javascript复制
public abstract Writable call(Writable param, long receiveTime) throws IOException

这表明, Server 提供了一个架子, Server 的具体功能,需要具体类来完成。而具体类,当然就是实现 call 方法。

我们先来分析 Server.Call ,和 Client.Call 类似, Server.Call 包含了一次请求,其中, id 和 param 的含义和 Client.Call 是一致的。不同点在后面三个属性, connection 是该 Call 来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection ,发送给客户端。属性 timestamp 是请求到达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。 最后的 response 是请求处理的结果, 可能是一个 Writable 的串行化结果, 也可能一个异常的串行化结果。

Server.Connection 维护了一个来自客户端的 socket 连接。它处理版本校验,读取请求并把请求发送到请求处理线程,接收处理结果并把结果发送给客户端。

Hadoop的 Server 采用了 Java 的 NIO,这样的话就不需要为每一个 socket 连接建立一个线程, 读取 socket 上的数据。在 Server 中,只需要一个线程,就可以 accept 新的连接请求和读取 socket 上的数据,这个线程,就是上面图里的 Listener 。

请求处理线程一般有多个, 它们都是 Server.Handle 类的实例。它们的 run 方法循环地取出一个 Server.Call ,调用 Server.call方法,搜集结果并串行化,然后将结果放入 Responder 队列中。

对于处理完的请求,需要将结果写回去,同样,利用 NIO,只需要一个线程,相关的逻辑在 Responder 里。

Hadoop源代码分析(七)

(注:本节需要用到一些 Java 反射的背景)

有了 Client 和 Server ,很自然就能 RPC啦。下面轮到 RPC.java 啦。

一般来说,分布式对象一般都会要求根据接口生成存根和框架。如 CORBA,可以通过 IDL,生成存根和框架。但是,在 org.apache.hadoop.rpc ,我们就不需要这样的步骤了,上类图。

为了分析 Invoker ,我们需要介绍一些 Java 反射实现 DynamicProxy 的背景。

DynamicProxy 是由两个 class 实现的: java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler ,后者是一个接 口。所谓 DynamicProxy 是这样一种 class :它是在运行时生成的 class ,在生成它时你必须提供一组 interface 给它,然后该 class 就宣称它实现了这些 interface 。

这个 DynamicProxy 其实就是一个典型的 Proxy 模式,它不会替你作实质性的工作, 在生成它的实例时你必须提供一个 handler ,由它接管实际的工作。这个 handler ,在 Hadoop的 RPC中,就是 Invoker 对象。

我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的 InvocationHandler 实现中。

在 Hadoop的 RPC中,Invoker 实现了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的唯一方法)。Invoker 会把所有跟这次调用相关的调用方法名, 参数类型列表,参数列表打包,然后利用前面我们分析过的 Client ,通过 socket 传递到服务器端。就是说,你在 proxy 类上的任何调用,都通过 Client 发送到远方的服务器上。

Invoker 使用 Invocation 。Invocation 封装了一个远程调用的所有相关信息,它的主要属性有 : methodName,调用方法名,parameterClasses ,调用方法参数的类型列表和 parameters ,调用方法参数。注意,它实现了 Writable 接口,可以串行化。

RPC.Server 实现了 org.apache.hadoop.ipc.Server,你可以把一个对象,通过 RPC,升级成为一个服务器。服务器接收到的请求(通过 Invocation ),解串行化以后,就变成了方法名,方法参数列表和参数列表。利用 Java 反射,我们就可以调用对应 的对象的方法。调用的结果再通过 socket ,返回给客户端,客户端把结果解包后,就可以返回给 DynamicProxy 的使用者了。

Hadoop源代码分析(八)

一个典型的 HDFS系统包括一个 NameNode和多个 DataNode。NameNode维护名字空间;而 DataNode存储数据块。

DataNode负责存储数据,一个数据块在多个 DataNode中有备份;而一个 DataNode对于一个块最多只包含一个备份。所以我们可以简单地认为 DataNode上存了数据块 ID 和数据块内容,以及他们的映射关系。

一个 HDFS集群可能包含上千 DataNode节点,这些 DataNode定时和 NameNode通信,接受 NameNode的指令。为了减轻 NameNode的负担,NameNode上并不永久保存那个 DataNode上有那些数据块的信息, 而是通过 DataNode启动时的上报, 来更新 NameNode上的映射表。

DataNode和 NameNode建立连接以后,就会不断地和 NameNode保持心跳。心跳的返回其还也包含了 NameNode对 DataNode的一些命令,如删除数据库或者是把数据块复制到另一个 DataNode。应该注意的是: NameNode不会发起到 DataNode的请求,在这个通信过程中,它们是严格的客户端 / 服务器架构。

DataNode当然也作为服务器接受来自客户端的访问,处理数据块读 / 写请求。 DataNode之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候, DataNode需要相互配合,保证写操作的一致性。

下面我们就来具体分析一下 DataNode的实现。 DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。

安装 Hadoop的时候,我们会指定对应的数据块存放目录, 当我们检查数据块存放目录时, 我们会发现下面有个叫 dfs 的目录,所有的数据就存放在 dfs/data 里面。

其中有两个文件, storage 里存的东西是一些出错信息。 in_use.lock 是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。

接下来是 3 个目录, current 存的是当前有效的数据块, detach 存的是快照( snapshot ,目前没有实现), tmp 保存的是一些操作需要的临时数据块。

但我们进入 current 目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0 到 subdir63 ,子目录下也有数据块文件和数据块元数据。 这是因为 HDFS限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。

数据块文件显然保存了 HDFS中的数据,数据块最大可以到 64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:

blk_3148782637964391313 blk_3148782637964391313_242812.meta

上面的例子中, 3148782637964391313是数据块的 ID 号,242812是数据块的版本号,用于一致性检查。

在 current 目录下还有下面几个文件:

VERSION,保存了一些文件系统的元信息

dncp_block_verification.log.currdncp_block_verification.log.prev ,它记录了一些 DataNode对文件系定时统做一致性检查需要的信息。

Hadoop源代码分析(九)

在继续分析 DataNode之前,我们有必要看一下系统的工作状态。启动 HDFS的时候,我们可以选择以下启动参数:

  • FORMAT("-format") :格式化系统
  • REGULAR("-regular"):正常启动
  • UPGRADE("-upgrade"):升级
  • ROLLBACK("-rollback"):回滚
  • FINALIZE("-finalize"):提交
  • IMPORT("-importCheckpoint") :从 Checkpoint 恢复

作为一个大型的分布式系统, Hadoop内部实现了一套升级机制。upgrade 参数就是为了这个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用 rollback 进行回滚;如果过了一段时间,系统运行正常,那就可以通过 finalize ,正式提交这次升级

importCheckpoint 选项用于 NameNode发生故障后,从某个检查点恢复。

有了上面的描述,我们得到下面左边的状态图:

大家应该注意到,上面的升级 / 回滚/ 提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上, 甚至可能出现某些节点已经升级成功, 但有些节点可能处于中间状态的情况, 所以 Hadoop采用类似于数据库事务的升级机制也就不是很奇怪。

大家先理解一下上面的状态图,它是下面我们要介绍 DataNode存储的基础。

Hadoop源代码分析(十)

我们来看一下升级 / 回滚/ 提交时的 DataNode上会发生什么(在类 DataStorage 中实现)。

前面我们提到过 VERSION文件,它保存了一些文件系统的元信息,这个文件在系统升级时,会发生对应的变化。

升级时, NameNode会将新的版本号,通过 DataNode的登录应答返回。 DataNode收到以后,会将当前的数据块文件目录改名,从 current 改名为 previous.tmp ,建立一个 snapshot ,然后重建 current 目录。重建包括重建 VERSION文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到 previous.tmp 的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件, current 和 previous.tmp 中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后, 会在 current里写入新的 VERSION文件,并将 previous.tmp 目录改名为 previous ,完成升级。

了解了升级的过程以后, 回滚就相对简单。 因为说有的旧版本信息都保存在 previous 目录里。回滚首先将 current 目录改名为removed.tmp,然后将 previous 目录改名为 current ,最后删除 removed.tmp 目录。

提交的过程,就是将上面的 previous 目录改名为 finalized.tmp ,然后启动一个线程,将该目录删除。

下图给出了上面的过程:

需要注意的是, HDFS的升级,往往只是支持从某一个特点的老版本升级到当前版本。回滚时能够恢复到的版本,也是 previous中记录的版本。

下面我们继续分析 DataNode。

文字分析完 DataNode存储在文件上的数据以后,我们来看一下运行时对应的数据结构。从大到小, Hadoop中最大的结构是Storage ,最小的结构,在 DataNode上是 block 。

类 Storage 保存了和存储相关的信息,它继承了 StorageInfo ,应用于 DataNode的 DataStorage ,则继承了 Storage ,总体类图如下:

StorageInfo 包含了 3 个字段,分别是 layoutVersion :版本号,如果 Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致。 namespaceID是 Storage 的 ID,cTime,creationtime 。

和 StorageInfo 相比,Storage 就是个大家伙了。

Storage 可以包含多个根(参考配置项 dfs.data.dir 的说明),这些根通过 Storage 的内部类 StorageDirectory 来表示。StorageDirectory 中最重要的方法是 analyzeStorage ,它将根据系统启动时的参数和我们上面提到的一些判断条件, 返回系统现在的状态。 StorageDirectory 可能处于以下的某一个状态(与系统的工作状态一定的对应):

NON_EXISTENT :指定的目录不存在; NOT_FORMA TTED :指定的目录存在但未被格式化; COMPLETE_UPGRADE :previous.tmp 存在, current 也存在 RECOVER_UPGRADE :previous.tmp 存在, current 不存在 COMPLETE_FINALIZE :finalized.tmp 存在, current 也存在 COMPLETE_ROLLBACK :removed.tmp 存在, current 也存在, previous 不存在 RECOVER_ROLLBACK : removed.tmp 存在, current 不存在, previous 存在 COMPLETE_CHECKPOINT : lastcheckpoint.tmp 存在, current 也存在 RECOVER_CHECKPOINT :lastcheckpoint.tmp 存在, current 不存在 NORMAL :普通工作模式。

StorageDirectory 处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的 current 夹来进行判断。状态改变需要的工作文件夹包括:

previous:用于升级 后 保存以前版本的文件 previous.tmp :用于升级 过程中 保存以前版本的文件 removed.tmp:用于回滚 过程中 保存文件 finalized.tmp :用于提交 过程中 保存文件 lastcheckpoint.tmp :应用于从 NameNode 中,导入 一个检查点 previous.checkpoint :应用于从 NameNode 中, 结束导入 一个检查点

有了这些状态,就可以对系统进行恢复(通过方法 doRecover)。恢复的动作如下(结合上面的状态转移图):

COMPLETE_UPGRADE :mvprevious.tmp->previous RECOVER_UPGRADE :mvprevious.tmp->current COMPLETE_FINALIZE:rmfinalized.tmp COMPLETE_ROLLBACK :rmremoved.tmp RECOVER_ROLLBACK :mvremoved.tmp->current COMPLETE_CHECKPOINT :mvlastcheckpoint.tmp->previous.checkpoint RECOVER_CHECKPOINT :mvlastcheckpoint.tmp->current

我们以 RECOVER_UPGRADE 为例,分析一下。根据升级的过程,

1.current->previous.tmp 2.重建 current 3.previous.tmp->previous

当我们发现 previous.tmp 存在,current 不存在,我们知道只需要将 previous.tmp 改为 current ,就能恢复到未升级时的状态。

StorageDirectory 还管理着文件系统的元信息, 就是我们上面提过 StorageInfo 信息,当然,StorageDirectory 还保存每个具体用途自己的信息。这些信息,其实都存储在 VERSION文件中, StorageDirectory 中的 read/write 方法,就是用于对这个文件进行读 / 写。下面是某一个 DataNode的 VERSION文件的例子:

配置文件代码

代码语言:javascript复制
1. #FriNov1410:27:35CST2008
2. namespaceID=1950997968
3. storageID=DS-697414267-127.0.0.1-50010-1226629655026
4. cTime=0
5. storageType=DATA_NODE 
6. layoutVersion=-16

对 StorageDirectory 的排他操作需要锁, 还记得我们在分析系统目录时提到的 in_use.lock 文件吗?它就是用来给整个系统加/解 锁用的。 StorageDirectory 提供了对应的 lock 和 unlock 方法。

分析完 StorageDirectory 以后, Storage 类就很简单了。基本上都是对一系列 StorageDirectory 的操作,同时 Storage 提供一些辅助方法。

DataStorage 是 Storage 的子类,专门应用于 DataNode。上面我们对 DataNode的升级 / 回滚/ 提交过程, 就是对 DataStorage 的 doUpgrade/doRollback/doFinalize 分析得到的。

DataStorage 提供了 format 方法,用于创建 DataNode上的 Storage ,同时,利用 StorageDirectory ,DataStorage 管理存储系统的状态。

小结

Hadoop源代码分析【6-10】主要为大家科普了RPC实现通信的流程,以及 DataNode在升级 / 回滚/ 提交时底层的变化。

0 人点赞