hadoop 源代码分析(一)
Google 的核心竞争技术是它的计算平台。HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施。
GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html BigTable:http://labs.google.com/papers/bigtable.html MapReduce:http://labs.google.com/papers/mapreduce.html
很快,Apache上就给出了一个类似的解决方案,目前它们都属于 Apache 的Hadoop 项目,对应的分别是:
Apache | |
---|---|
Chubby | ZooKeeper |
GFS | HDFS |
BigTable | HBase |
MapReduce | Hadoop |
目前,基于类似思想的 Open Source 项目还很多,如FaceBook 用于用户分析的 Hive。 HDFS 作为一个分布式文件系统,是所有这些项目的基础。分析好 HDFS,有利于了解其它系统。由于 Hadoop 的HDFS和MapReduce是同一个项目,我们就把它们放在一起进行分析。
下图是MapReduce整个项目的顶层包图和它们之间的依赖关系。Hadoop 包之间的依赖关系比较复杂,原因是 HDFS 提供了一个分布式文件系统,该系统提供API,可以屏蔽本地文件系统和分布式文件系统, 甚至像Amazon S3这样的在线存储系统。这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。功能的相互引用, 造成了蜘蛛网型的依赖关系。一个典型的例子就是conf,conf用于读取系统配置,它依赖于fs,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包fs中被抽象了。
Hadoop 的关键部分集中于图中蓝色部分,这也是我们要掌握的重点。
Package | Dependences |
---|---|
tool | mapreduce,fs,hdfs,ipc,io,security,conf,util |
mapreduce | filecache,fs,hdfs,ipc,io,net,metrics,security,conf,util |
filecache | fs,conf,util |
fs | hdfs,ipc,io,net,metrics,security,conf,util |
hdfs | fs,ipc,io,http,net,metrics,security,conf,util |
ipc | io,net,metrics,security,conf,util |
io | ipc,fs,conf,util |
net | ipc,fs,conf,util |
security | io,conf,util |
conf | fs,io,util |
metrics | util |
util | mapred,fs,io,conf |
record | io.Writable* |
http | log,conf,util |
log | util |
hadoop 源代码分析(二)
下面给出了 Hadoop 的包的功能分析。
Package | Dependences |
---|---|
tool | 提供一些命令行工具,如 DistCp,archive |
mapreduce | Hadoop 的 Map/Reduce 实现 |
filecache | 提供HDFS文件的本地缓存,用于加快 |
fs | 文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口 |
hdfs | HDFS,Hadoop 的分布式文件系统实现 |
ipc | 一个简单的 IPC 的实现,依赖于io 提供的编解码功能参考:http://zhangyu8374.javaeye.com/blog/86306 |
io | 表示局。将各种数据编码/解码,方便亍在网络上传输 |
net | 封装部分网络功能,如 DNS,socket |
security | 用户和用户组信息 |
conf | 系统的配置参数 |
metrics | 系统统计数据的收集,属亍网管范畴 |
util | 工具类 |
record | 根据 DDL(数据描述诧言)自动生成他们的编解码函数,目前可以提供 C++和 Java |
http | 基于Jetty 的 HTTP Servlet,用户通过浏览器可以观察文件系统的一些状态信息和日志 |
log | 提供 HTTP 访问日志的 HTTP Servlet |
hadoop 源代码分析(三)
由于 Hadoop 的 MapReduce 和 HDFS 都有通信的需求,需要对通信的对象进行序列化。Hadoop 并没有采用 Java 的序列化,而是引入了它自己的系统。
org.apache.hadoop.io 中定义了大量的可序列化对象,他们都实现了 Writable 接口。实现了 Writable 接口的一个典型例子如下:
Java 代码
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
其中的 write 和 readFields 分别实现了把对象序列化和反序列化的功能,是 Writable 接口定义的两个方法。下图给出了庞大的 org.apache.hadoop.io 中对象的关系。
这里,我把 ObjectWritable 标为红色,是因为相对于其他对象,它有不同的地位。当我们讨论 Hadoop 的 RPC 时,我们会提到 RPC 上交换的信息,必须是 Java 的基本类型,String 和 Writable 接口的实现类,以及元素为以上类型的数组。
ObjectWritable 对象保存了一个可以在 RPC 上传输的对象和对象的类型信息。这样,我们就有了一个万能的,可以用于客户端/服务器间传输的 Writable 对象。例如,我们要把上面例子中的对象作为 RPC 请求,需要根据 MyWritable 创建一个ObjectWritable,ObjectWritable 往流里会写如下信息。
对象类名长度,对象类名,对象自己的串行化结果。
这样,到了对端,ObjectWritable 可以根据对象类名创建对应的对象,并解串行。应该注意到,ObjectWritable 依赖于WritableFactories,那存储了 Writable 子类对应的工厂。我们需要把 MyWritable 的工厂,保存在 WritableFactories 中(通过 WritableFactories.setFactory)。
hadoop 源代码分析(四)
介绍完 org.apache.hadoop.io 以后,我们开始来分析 org.apache.hadoop.rpc。RPC 采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论 HDFS 的,通信可能发生在:
1)Client-NameNode 之间,其中 NameNode 是服务器; 2)Client-DataNode 之间,其中 DataNode 是服务器; 3)DataNode-NameNode 之间,其中 NameNode 是服务器 ; 4)DataNode-DateNode 之间,其中某一个 DateNode 是服务器,另一个是客户端。
如果我们考虑 Hadoop 的 Map/Reduce 以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop 引入了一个 RPC 框架。该 RPC 框架利用的 Java 的反射能力,避免了某些 RPC 解决方案中需要根据某种接口诧言(如CORBA 的 IDL)生成存根和框架的问题。但是,该 RPC 框架要求调用的参数和返回结果必须是 Java 的基本类型,String 和Writable 接口的实现类,以及元素为以上类型的数组。同时,接口方法应该叧抛出 IOException 异常。
既然是 RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc 也就有了类 Client 和类 Server。但是类 Server 是一个抽象类,类 RPC 封装了 Server,利用反射,把某个对象的方法开放出来,发成 RPC 中的服务器。
下图是 org.apache.hadoop.rpc 的类图。
hadoop 源代码分析(五)
既然是 RPC,自然就有客户端和服务器,当然,org.apache.hadoop.rpc 也就有了类 Client 和类 Server。在这里我们来仔细考察 org.apache.hadoop.rpc.Client。下面的图包含了 org.apache.hadoop.rpc.Client 中的关键类和关键方法。
由于 Client 可能和多个 Server 通信,典型的一次 HDFS 读,需要和 NameNode 打交道,也需要和某个/某些 DataNode 通信。这就意味着某一个 Client 需要维护多个连接。同时,为了减少不必要的连接,现在 Client 的做法是拿 ConnectionId(图 中最右侧)来做为 Connection 的 ID。ConnectionId 包括一个 InetSocketAddress(IP 地址+端口号或主机名+端口号)对象和一个用户信息对象。这就是说,同一个用户到同一个 InetSocketAddress 的通信将共享同一个连接。
连接被封装在类 Client.Connection 中,所有的 RPC 调用,都是通过 Connection,进行通信。一个 RPC 调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个 Connection 上的不同调用,每个调用都有唯一的 id。调用是否结束也需要一个标记,所有的这些都体现在对象 Client.Call 中。Connection 对象通过一个 Hash 表,维护在这个连接上的所有 Call:
Java 代码
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
一个 RPC 调用通过 addCall,把请求加到 Connection 里。为了能够在这个框架上传输 Java 的基本类型,String 和 Writable接口的实现类,以及元素为以上类型的数组,我们一般把 Call 需要的参数打包成为 ObjectWritable 对象。
Client.Connection 会通过 socket 连接服务器,连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过 Writable 对象来进行请求的収送/应答了。注意,每个 Client.Connection 会起一个线程,不断去读取 socket,并将收到的结果解包,找出对应的 Call,设置 Call 并通知结果已经获取。
Call 使用 Obejct 的 wait 和 notify,把 RPC 上的异步消息交互转成同步调用。
还有一点需要注意,一个 Client 会有多个 Client.Connection,这是一个很自然的结果。
hadoop 源代码分析(六)
下面我们学习Server,首先看一下Server类图。
需要注意的是,这里的 Server 类是个抽象类,唯一抽象的地方,就是Java 代码。
public abstract Writable call(Writable param, long receiveTime) throws IOException;
这表明,Server 提供了一个架子,Server 的具体功能,需要具体类来完成。而具体类,当然就是实现 call 方法。
我们先来分析 Server.Call,和 Client.Call 类似,Server.Call 包含了一次请求,其中,id 和 param 的含义和 Client.Call 是一致的。不同点在后面三个属性,connection 是该 Call 来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,収送给客户端。属性 timestamp 是请求刡达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的 response 是请求处理的结果,可能是一个 Writable 的串行化结果,也可能一个异常的串行化结果。
Server.Connection 维护了一个来之客户端的 socket 连接。它处理版本校验,读取请求并把请求収送刡请求处理线程,接收处理结果并把结果収送给客户端。
Hadoop 的 Server 采用了 Java 的 NIO,这样的话就不需要为每一个 socket 连接建立一个线程,读取 socket 上的数据。在Server 中,叧需要一个线程,就可以 accept 新的连接请求和读取 socket 上的数据,这个线程,就是上面图里的 Listener。
请求处理线程一般有多个,它们都是 Server.Handle 类的实例。它们的 run 方法循环地取出一个 Server.Call,调用 Server.call方法,搜集结果并串行化,然后将结果放入 Responder 队列中。
对于处理完的请求,需要将结果写回去,同样,利用 NIO,叧需要一个线程,相关的逡辑在 Responder 里。
hadoop 源代码分析(七)
有了 Client 和 Server,很自然就能 RPC远程过程调用。下面轮到 RPC.java了。
一般来说,分布式对象一般都会要求根据接口生成存根和框架。如 CORBA,可以通过 IDL,生成存根和框架。但是在org.apache.hadoop.rpc,我们就不需要这样的步骤了。下面我们来看一下类图。
为了分析 Invoker,我们需要介绍一些 Java 反射实现 Dynamic Proxy 的背景。
Dynamic Proxy 是由两个 class 实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler,后者是一个接口。所谓 Dynamic Proxy 是这样一种 class:它是在运行时生成的 class,在生成它时你必须提供一组 interface 给它,然后该 class就宣称它实现了这些 interface。
这个 Dynamic Proxy 其实就是一个典型的 Proxy 模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个 handler,在 Hadoop 的 RPC 中,就是 Invoker 对象。
我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler 实现中。
在 Hadoop 的 RPC 中,Invoker 实现了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的唯一方法)。Invoker 会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的 Client,通过 socket 传递到服务器端。就是说,你在 proxy 类上的任何调用,都通过 Client 发送到迖方的服务器上。
Invoker 使用 Invocation。Invocation 封装了一个迖程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和 parameters,调用方法参数。注意,它实现了 Writable 接口,可以串行化。
RPC.Server 实现了 org.apache.hadoop.ipc.Server,你可以把一个对象,通过 RPC,升级成为一个服务器。服务器接收到的请求(通过 Invocation),解串行化以后,就发成了方法名,方法参数列表和参数列表。利用 Java 反射,我们就可以调用对应的对象的方法。调用的结果再通过 socket,返回给客户端,客户端把结果解包后,就可以返回给 Dynamic Proxy 的使用者了。
hadoop 源代码分析(八)
一个典型的 HDFS 系统包括一个 NameNode 和多个 DataNode。NameNode 维护名字空间;而 DataNode 存储数据块。
DataNode 负责存储数据,一个数据块在多个 DataNode 中有备仹;而一个 DataNode 对于一个块最多叧包含一个备仹。所以我们可以简单地认为 DataNode 上存了数据块 ID 和数据块内容,以及他们的映射关系。
一个 HDFS 集群可能包含上千 DataNode 节点,这些 DataNode 定时和 NameNode 通信,接受 NameNode 的指令。为了减轻 NameNode 的负担,NameNode 上并不永久保存那个 DataNode 上有那些数据块的信息,而是通过 DataNode 启动时的上报,来更新 NameNode 上的映射表。
DataNode 和 NameNode 建立连接以后,就会不断地和 NameNode 保持心跳。心跳的返回其还也包含了 NameNode 对DataNode 的一些命令,如删除数据库或者是把数据块复制到另一个 DataNode。应该注意的是:NameNode 不会发起到DataNode 的请求,在这个通信过程中,它们是严格的客户端/服务器架构。
DataNode 当然也作为服务器接叐来自客户端的访问,处理数据块读/写请求。DataNode 之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候,DataNode 需要相互配合,保证写操作的一致性。
下面我们就来具体分析一下 DataNode 的实现。DataNode 的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。
安装 Hadoop 的时候,我们会指定对应的数据块存放目录,当我们检查数据块存放目录目录时,我们会发现下面有个叫 dfs 的目录,所有的数据就存放在 dfs/data 里面。
其中有两个文件,storage 里存的东西是一些出错信息,貌似是版本不对…云云。in_use.lock 是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。
接下来是 3 个目录,current 存的是当前有效的数据块,detach 存的是快照(snapshot,目前没有实现),tmp 保存的是一些操作需要的临时数据块。
但我们进入 current 目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0 到 subdir63,子目录下也有数据块文件和数据块元数据。这是因为 HDFS 限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。
数据块文件显然保存了 HDFS 中的数据,数据块最大可以到 64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:
blk_3148782637964391313
blk_3148782637964391313_242812.meta
上面的例子中,3148782637964391313 是数据块的 ID 号,242812 是数据块的版本号,用于一致性检查。
在 current 目录下还有下面几个文件:
VERSION,保存了一些文件系统的元信息。
dncp_block_verification.log.curr 和 dncp_block_verification.log.prev,它记录了一些 DataNode 对文件系定时统做一致性检查需要的信息。
hadoop 源代码分析(九)
在继续分析 DataNode 之前,我们有必要看一下系统的工作状态。启动 HDFS 的时候,我们可以选择以下启动参数:
FORMAT("-format"):格式化系统
REGULAR("-regular"):正常启动
UPGRADE("-upgrade"):升级
ROLLBACK("-rollback"):回滚
FINALIZE("-finalize"):提交
IMPORT("-importCheckpoint"):从 Checkpoint 恢复。
作为一个大型的分布式系统,Hadoop 内部实现了一套升级机制(http://wiki.apache.org/hadoop/Hadoop_Upgrade)。
upgrade 参数就是为了这个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用 rollback 进行回滚;如果过了一段时间,系统运行正常,那就可以通过 finalize,正式提交这次升级(跟数据库有点像啊)。
importCheckpoint 选项用于 NameNode 发生故障后,从某个检查点恢复。
有了上面的描述,我们得到下面左边的状态图:
大家应该注意到,上面的升级/回滚/提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上,甚至可能出现某些节点已经升级成功,但有些节点可能处于中间状态的情况,所以 Hadoop 采用类似于数据库事务的升级机制也就不是很奇怪。
大家先理解一下上面的状态图,它是下面我们要介绍 DataNode 存储的基础。
hadoop 源代码分析(十)
我们来看一下升级/回滚/提交时的 DataNode 上会发生什么(在类 DataStorage 中实现)。
前面我们提到过 VERSION 文件,它保存了一些文件系统的元信息,这个文件在系统升级时,会发生对应的发化。
升级时,NameNode 会将新的版本号,通过 DataNode 的登录应答返回。DataNode 收到以后,会将当前的数据块文件目录改名,从 current 改名为 previous.tmp,建立一个 snapshot,然后重建 current 目录。重建包括重建 VERSION 文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到 previous.tmp 的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件,current 和 previous.tmp 中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后,会在 current 里写入新的 VERSION 文件,并将 previous.tmp 目录改名为 previous,完成升级。
了解了升级的过程以后,回滚就相对简单。因为说有的旧版本信息都保存在 previous 目录里。回滚首先将 current 目录改名为 removed.tmp,然后将 previous 目录改名为 current,最后删除 removed.tmp 目录。
提交的过程,就是将上面的 previous 目录改名为 finalized.tmp,然后启动一个线程,将该目录删除。
下图给出了上面的过程:
需要注意的是,HDFS 的升级,往往只是支持从某一个特点的老版本升级到当前版本。回滚时能够恢复到的版本,也是 previous中记录的版本。
下面我们继续分析 DataNode。
文字分析完 DataNode 存储在文件上的数据以后,我们来看一下运行时对应的数据结构。从大到小,Hadoop 中最大的结构是 Storage,最小的结构,在 DataNode 上是 block。
类 Storage 保存了和存储相关的信息,它继承了 StorageInfo,应用于 DataNode 的 DataStorage,则继承了 Storage,总体类图如下:
StorageInfo 包含了 3 个字段,分别是 layoutVersion:版本号,如果 Hadoop 调整文件结构布尿,版本号就会修改,这样可以保证文件结构和应用一致。namespaceID 是 Storage 的 ID,cTime,creation time。
和 StorageInfo 相比,Storage 就是个大家伙了。
Storage 可以包含多个根(参考配置项 dfs.data.dir 的说明),这些根通过 Storage 的内部类 StorageDirectory 来表示。StorageDirectory 中最重要的方法是 analyzeStorage,它将根据系统启动时的参数和我们上面提到的一些判断条件,返回系统现在的状态。StorageDirectory 可能处于以下的某一个状态(不系统的工作状态一定的对应):
NON_EXISTENT:指定的目录不存在;
NOT_FORMATTED:指定的目录存在但未被格式化;
COMPLETE_UPGRADE:previous.tmp 存在,current 也存在
RECOVER_UPGRADE:previous.tmp 存在,current 不存在
COMPLETE_FINALIZE:finalized.tmp 存在,current 也存在
COMPLETE_ROLLBACK:removed.tmp 存在,current 也存在,previous 不存在
RECOVER_ROLLBACK:removed.tmp 存在,current 不存在,previous 存在
COMPLETE_CHECKPOINT:lastcheckpoint.tmp 存在,current 也存在
RECOVER_CHECKPOINT:lastcheckpoint.tmp 存在,current 不存在
NORMAL:普通工作模式。
StorageDirectory 处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的 current 夹来进行判断。状态改变需要的工作文件夹包括:
previous:用于升级后保存以前版本的文件
previous.tmp:用于升级过程中保存以前版本的文件
removed.tmp:用于回滚过程中保存文件
finalized.tmp:用于提交过程中保存文件
lastcheckpoint.tmp:应用于从 NameNode 中,导入一个检查点
previous.checkpoint:应用于从 NameNode 中,结束导入一个检查点
有了这些状态,就可以对系统进行恢复(通过方法 doRecover)。恢复的动作如下(结合上面的状态转移图):
COMPLETE_UPGRADE:mv previous.tmp -> previous RECOVER_UPGRADE:mv previous.tmp -> current COMPLETE_FINALIZE:rm finalized.tmp COMPLETE_ROLLBACK:rm removed.tmp RECOVER_ROLLBACK:mv removed.tmp -> current COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp -> previous.checkpoint RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current
我们以 RECOVER_UPGRADE 为例,分析一下。根据升级的过程,
1. current->previous.tmp
2. 重建 current
3. previous.tmp->previous
当我们发现 previous.tmp 存在,current 不存在,我们知道只需要将 previous.tmp 改为 current,就能恢复到未升级时的状态。
StorageDirectory 还管理着文件系统的元信息,就是我们上面提过 StorageInfo 信息,当然,StorageDirectory 还保存每个具体用途自己的信息。这些信息,其实都存储在 VERSION 文件中,StorageDirectory 中的 read/write 方法,就是用于对这个文件进行读/写。下面是某一个 DataNode 的 VERSION 文件的例子:
配置文件代码
1. #Fri Nov 14 10:27:35 CST 2008 2. namespaceID=1950997968 3. storageID=DS-697414267-127.0.0.1-50010-1226629655026 4. cTime=0 5. storageType=DATA_NODE 6. layoutVersion=-16
对 StorageDirectory 的排他操作需要锁,还记得我们在分析系统目录时提到的 in_use.lock 文件吗?它就是用来给整个系统加/解锁用的。StorageDirectory 提供了对应的 lock 和 unlock 方法。
分析完 StorageDirectory 以后,Storage 类就很简单了。基本上都是对一系列 StorageDirectory 的操作,同时 Storage 提供一些辅助方法。
DataStorage 是 Storage 的子类,与门应用于 DataNode。上面我们对 DataNode 的升级/回滚/提交过程,就是对 DataStorage的 doUpgrade/doRollback/doFinalize 分析得到的。
DataStorage 提供了 format 方法,用于创建 DataNode 上的 Storage,同时,利用 StorageDirectory,DataStorage 管理存储系统的状态。