写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
Hadoop源代码分析(一)
Google 的核心竞争技术是它的计算平台。Google 的大牛们用了下面 5 篇文章,介绍了它们的计算设施。
GoogleCluster: http://research.google.com/archive/googlecluster.html
Chubby:http://labs.google.com/papers/chubby.html
GFS:http://labs.google.com/papers/gfs.html
BigTable:http://labs.google.com/papers/bigtable.html
MapReduce:http://labs.google.com/papers/mapreduce.html
很快,Apache 上就出现了一个类似的解决方案,目前它们都属于 Apache 的 Hadoop 项目,对应的分别是:
代码语言:javascript复制Chubby-->ZooKeeper
GFS-->HDFS
BigTable-->HBase
MapReduce-->Hadoop
目前,基于类似思想的 Open Source 项目还很多,如 Facebook 用于用户分析的 Hive。
HDFS 作为一个分布式文件系统,是所有这些项目的基础。分析好 HDFS,有利于了解其他系统。由于 Hadoop 的 HDFS 和 MapReduce 是同一个项目,我们就把他们放在一块,进行分析。
下图是 MapReduce 整个项目的顶局包图和他们的依赖关系。Hadoop 包之间的依赖关系比较复杂,原因是 HDFS 提供了一个分布式文件系统,该系统提供 API,可以屏蔽本地文件系统和分布式文件系统,甚至象 Amazon S3 返样的在线存储系统。这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。功能的相互引用,造成了蜘蛛网型的依赖关系。一个典型的例子就是包 conf,conf 用于读取系统配置,它依赖于 fs,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包 fs 中被抽象了。
Hadoop 的关键部分集中于图中蓝色部分,这也是我们考察的重点。
Hadoop源代码分析(二)
下面给出了 Hadoop 的包的功能分析。
Package | Dependences |
---|---|
tool | 提供一些命令行工具,如 DistCp,archive |
mapreduce | Hadoop 的 Map/Reduce 实现 |
filecache | 提供HDFS文件的本地缓存, 用于加快Map/Reduce 的数据访问速度 |
fs | 文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口 |
hdfs | HDFS,Hadoop 的分布式文件系统实现 |
ipc | 一个简单的 IPC 的实现,依赖于 io 提供的编解码功能 |
io | 表示层,将各种数据编码/解码,方便于在网络上传输 |
net | 封装部分网络功能,如 DNS,socket |
security | 用户和用户组信息 |
conf | 系统的配置参数 |
metrics | 系统统计数据的收集,属于网管范畴 |
util | 工具类 |
record | 根据 DDL(数据描述语言)自动生成他们的编解码函数,目前可以提供 C 和 Java |
http | 基于 Jetty 的 HTTP Servlet,用户通过浏览器可以观察文件系统的一些状态信息和日志 |
log | 提供 HTTP 访问日志的 HTTP Servlet |
Hadoop源代码分析(三)
由于 Hadoop 的 MapReduce 和 HDFS 都有通信的需求,需要对通信的对象进行序列化。Hadoop 并没有采用 Java 的序列化,而是引入了它自己的系统。
org.apache.hadoop.io
中定义了大量的可序列化对象,他们都实现了 Writable 接口。实现了 Writable 接口的一个典型例子如下:
public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}
其中的 write
和 readFields
分别实现了把对象序列化和反序列化的功能,是 Writable 接口定义的两个方法。下图给出了庞大的 org.apache.hadoop.io
中对象的关系。
这里,把 ObjectWritable 标为红色,是因为相对于其他对象,它有不同的地位。当我们讨论 Hadoop的 RPC时,我们会提到RPC上交换的信息, 必须是 Java 的基本类型, String 和 Writable 接口的实现类, 以及元素为以上类型的数组。
ObjectWritable对象保存了一个可以在 RPC上传输的对象和对象的类型信息。 这样,我们就有了一个万能的, 可以用于客户端 / 服务器间传输的 Writable 对象。例如,我们要把上面例子中的对象作为 RPC请求,需要根据 MyWritable 创建一个 ObjectWritable ,ObjectWritable 往流里会写如下信息。
代码语言:javascript复制对象类名长度,对象类名,对象自己的串行化结果
这样,到了对端, ObjectWritable 可以根据对象类名创建对应的对象,并解串行。应该注意到, ObjectWritable 依赖于 WritableFactories ,那里存储了 Writable 子类对应的工厂。我们需要把 MyWritable 的工厂,保存在 WritableFactories 中(通过 WritableFactories. setFactory )。
Hadoop源代码分析(四)
介绍完 org.apache.hadoop.io 以后,我们开始来分析 org.apache.hadoop.rpc
。RPC采用客户机 / 服务器模式。 请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论 HDFS的,通信可能发生在:
- Client-NameNode 之间,其中 NameNode 是服务器
- Client-DataNode 之间,其中 DataNode 是服务器
- DataNode-NameNode 之间,其中 NameNode 是服务器
- DataNode-DateNode 之间,其中某一个 DateNode 是服务器,另一个是客户端
如果我们考虑 Hadoop的 Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机 / 服务器之间的通信, Hadoop引入了一个 RPC框架。该 RPC框架利用的 Java 的反射能力,避免了某些 RPC解决方案中需要根据某种接口语言(如 CORBA的 IDL)生成存根和框架的问题。但是,该 RPC框架要求调用的参数和返回结果必须是 Java 的基本类型, String 和 Writable 接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出 IOException 异常。
既然是 RPC,当然就有客户端和服务器,当然, org.apache.hadoop.rpc
也就有了类 Client 和类 Server 。但是类 Server 是一个抽象类,类 RPC封装了 Server ,利用反射,把某个对象的方法开放出来,变成 RPC中的服务器。
下图是 org.apache.hadoop.rpc
的类图。
Hadoop源代码分析(五)
既然是 RPC,自然就有客户端和服务器,当然, org.apache.hadoop.rpc
也就有了类 Client 和类 Server 。在这里我们来仔细考察 org.apache.hadoop.rpc.Client
。下面的图包含了 org.apache.hadoop.rpc.Client 中的关键类和关键方法。
由于 Client 可能和多个 Server 通信,典型的一次 HDFS读,需要和 NameNode打交道,也需要和某个 / 某些 DataNode通信。这就意味着某一个 Client 需要维护多个连接。同时,为了减少不必要的连接,现在 Client 的做法是拿 ConnectionId (图中最右侧)来做为 Connection 的 ID。ConnectionId 包括一个 InetSocketAddress (IP 地址 端口号或主机名 端口号)对象和一个用户信息对象。这就是说,同一个用户到同一个 InetSocketAddress 的通信将共享同一个连接。
连接被封装在类 Client.Connection 中,所有的 RPC调用,都是通过 Connection ,进行通信。一个 RPC调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个 Connection 上的不同调用,每个调用都有唯一的 id 。调用是否结束也需要一个标记,所有的这些都体现在对象 Client.Call 中。Connection 对象通过一个 Hash表,维护在这个连接上的所有 Call :
代码语言:javascript复制private Hashtable<Integer ,Call>calls = new Hashtable<Integer,Call>();
一个 RPC调用通过 addCall ,把请求加到 Connection 里。为了能够在这个框架上传输 Java 的基本类型, String 和 Writable 接口的实现类,以及元素为以上类型的数组,我们一般把 Call 需要的参数打包成为 ObjectWritable 对象。
Client.Connection 会通过 socket 连接服务器,连接成功后回校验客户端 / 服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过 Writable 对象来进行请求的发送 / 应答了。注意,每个 Client.Connection 会起一个线程,不断去读取 socket ,并将收到的结果解包,找出对应的 Call ,设置 Call 并通知结果已经获取。
Call 使用 Obejct 的 wait 和 notify ,把 RPC上的异步消息交互转成同步调用。
还有一点需要注意,一个 Client 会有多个 Client.Connection ,这是一个很自然的结果。
小结
Hadoop源代码分析【1-5】主要为大家科普了Hadoop下的各种包的功能分析,以及Hadoop下两大核心HDFS和MapReduce如何基于RPC框架去实现通信,数据传输。