###Mac OSX python 改名
在~/.bash_profile中增加:alias python=python3
一些设计模式
写程序的时候,规模小,尚不能感觉设计模式的重要性。等规模一上来,需求一迭代,一个应用了恰当设计模式的工程,总能以最小的代价进行最快的迭代。
但是一个奇怪的点是,我总记不住具体的实现所对应的设计模式的名字,但是对他们背后的设计思想,却是念念不忘——依赖于抽象而非具体;对扩展开放,对修改关闭;
Builder
首先,将一个复杂逻辑抽象成一组构建过程(具有前后先后次序,即时序约束)或者一组元操作(便于进行组合实现复杂逻辑),用一个接口封装。
然后,不同的逻辑实体类,继承该接口,进行不同的具体实现。
最后,依赖于接口,组合构建过程或元操作,进行具体业务代码实现。以后想换一个实现,只需要某处换一个具体实现类就行了。
1 | /* "Product" */ |
这就是一个典型的依赖于抽象而非具体。
一些有意思的细节
编程中有很多有意思的细节,看到了,就记在这里。
|
简化判断
一堆数按位或,只要有多于一个数为负,则结果为负。1
2
3
4
5
6
7
8public void write(byte b[], int off, int len) throws IOException {
if ((off | len | (b.length - (len + off)) | (off + len)) < 0)
throw new IndexOutOfBoundsException();
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}
from: FilterOutputStream
Hadoop 源码阅读之DFS(二):DataNode
上一篇把一些零碎的小类集在一起,凑成一篇。这篇打算对比较长的一个类DataNode
读读。
每个DataNode代表一个数据节点,对应某台机器的一个文件夹,本质上是一定数量的Block的集合,能够和NameNode,client以及其他DataNode进行通信,以对该Block集合进行操作,主要包括client的读和写,其他DataNode block的复制,以及响应NameNode操作,进行删除等操作。
具体实现来说,数据结构上,维持了一个block到byte array的表;执行时,DataNode内部是一个无限循环,不断询问NameNode,报告状态(心跳),执行命令(RPC)。
- 状态信息。
DataNodeInfo
:总大小,剩余大小,上次更新时间。 - 执行命令。
- 客户端读写Blocks
- 让其他DataNode复制Blocks
- 删除某些Blocks
此外,DataNode还维持着一个Server Socket以处理来自Client或者其他DataNode请求。DataNode会将其对外暴露的host:port提交给NameNode,后者会将该信息进一步下发给相关的其他DataNode或者client。
(摘自类注释)
StartUp
DataNode启动的时候主要干了以下事情:
为每个dfs.data.dir
实例化一个DataNode
,DataNode有以下几个重要字段:
namenode
,DatanodeProtocol
类型,和NameNode进行RPC通信。data
,FSDataset
类型,对应一个文件夹,负责具体的本地磁盘操作。localName
, machine name + port,对外暴露的网络机器名和端口。dataXceiveServer
,一个socket Server,监听上述端口,处理读写请求。- 其他一些配置字段,包括
blockReportInterval
,datanodeStartupPeriod
等。
Main Loop When Run
offerService
,该函数根据当前时间与上次动作时间差值,决定是否再一次执行该动作(DataNode
对NameNode
的RPC);这几个动作基本对应DataNodeProtocol
的各个函数,即RPC的几个动作约定。这些事件有向NameNode:
- 发送心跳信息
- 上传
block
信息 - 获取
NameNode
指令
下面分别就每一项进行详细说明:
1. 发送心跳信息
心跳信息包括以下几项内容:
- DataNode名字
- DataNode数据传输端口
- DataNode总容量
- DataNode剩余字节数
2. 上传当前Block信息
报告本DataNode的所有Block信息,以更新表machine->block list 和表block->machine list。利用TreeMap实现,能得到按BlockId排序的数组,通过逐一比较新旧上报Block数组的每个元素(oldReport
和newReport
),利用removeStoredBlock
和addStoredBlock
将旧数组更新为新数组。
然后NameNode将需要删除的Block数组返回,利用data
(FSDataSet
)句柄进行删除。
3. 报告新收到的Block信息,即ReceivedBlock
当Client写数据,或者其他DataNode复制数据给当前DataNode
的时候,该DataNode通过RPC,执行此函数。然后NameNode将其更新到保存元数据的table里。
4. 获取 NameNode指令
根据BlockCommand
类的字段:
1 | boolean transferBlocks = false; |
可以看出,指令动作包括交换(transfer)和删除(delete or invalidate);动作对象包括一系列blocks和DataNode,表示将blocks[i]
传送到targets[i][0]
… targets[i][j]
的DataNode上去。
具体传送实现,为每一个!invalid的block,启动一个线程,负责具体数据传送,代码为:1
new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
后面将对DataTransfer
类进行详细注解。
DataTransfer
该类实现了Runnable
接口,在每次有数据需要传输时被启动;其动作主要为:
- 连接第一个Target DataNode的socket,作为输出。
- 从
FSDataSet
中获取Block元信息以及本机器上该block对应的数据文件,作为输入。 - 从输入端读取数据,写到输出端。
因此,该类只负责将block信息写到第一个target DataNode,比如说Node1,剩下的将由Node1机器上的线程进行数据传送。
该block在本机实际的文件夹路径和文件名都可以根据blockId进行确定。对于一个64bit的blockId,从高位到地位,每四位作为一个文件夹的名字(0~15),进行路由,因此文件实际位置的深度可能高达64/4=16层;存储数据的文件命名方式为blk_{blockId}.
DataXceiveServer
该类也实现了Runnable
接口,在DataNode
初始化的时候被启动,用于监听Client或者其他DataNodes的请求,以进行block数据的传输。
具体实现为,使用SocketServer
,根据信号shouldListen
来循环监听所有请求。当请求到来时,使用DataXceiver
类进行具体连接的处理;
DataXceiver
该类负责具体实现数据传输的逻辑,包括Block的写和读,每次传输一个Block块,将该Block首先写入本地文件系统,然后传送给下一个目标DataNode;具体来说,
首先,打开socket输入流,读取首字节,判断操作类型;
然后,进行写或者读操作。
写操作(OP_WRITE_BLOCK)
读入header,包括以下几个字段
1
2
3
4
5
6a. shouldReportBlock --> bool
b. block info(blkid+len) --> Block
c. numTargets --> int
d. targets --> DatanodeInfo[]
e. encodingType --> byte
f. data length --> long然后将这些header信息,去掉该DataNode(
targets[0]
)的信息后,写入下一个DataNode (target[1]
)。- 从socket中读取具体存储的数据,先后写入本地存储(当前DataNode)和下一个DataNode的socket。这里有一点设计,就是如果写Socket异常后,可以终止Socket,但仍然继续写本地存储。
encodingType
的类型不同,读取数据方式不同:对于RUNLENGTH_ENCODING
类型,其结构是length(say l)+data(of the length l),因此读一次就结束;而CHUNKED_ENCODING
类型,结构为l1 + data1 + l2 + data2 + … + ln + datan + l(n+1) (=0);因此需要循环继续读如长度,然后读入该长度数据,直到len=0结束。- 如果和下一个DataNode间的socket仍然正常,则从该socket读回一些关于写数据的反馈,包括long型的结束符和
LocatedBlock
–>写成功后的block网络位置,是一个Block
和DatanodeInfo[]
对,表示该Block以及已经写成功的DataNode list。整个写操作和备份的过程类似于一个递归调用的过程,由client写datanode1, 然后datanode1写datanode2,然后datanode2写datanode3;然后datanode3将写成功信号,以及datanode3位置告诉datanode2,然后datanode2将写成功信号以及datanode2,datanode3位置告诉datanode1等等。
读操作(OP_READ_BLOCK
|| OP_READSKIP_BLOCK
)
首先读入待读取的Block信息,然后,如果是OP_READSKIP_BLOCK
类型,则读取需要跳过的字节数(toSkip
–>long);
然后通过data
–> FSDataSet 定位block本地存储文件位置,根据类型决定是否跳过特定字节(toSkip),然后逐字节读取该文件。
Aside info
如果类需要作为Key,比如TreeMap
,则需要实现Comparable
接口,只有可以比较才能进行排序和Hash;如果需要进行序列化和反序列化,则需要实现Writable
接口。
Hadoop 源码阅读之DFS(一):一些基本的类
计划花一个月左右的时间,通读一遍Hadoop 0.1.0的源码,尽量少写一些废话,多记录一些思考。
Random一下,就从分布式文件系统(DFS)开始吧。
DFS即分布式文件系统,集合多台机器存储在预定义位置上的一组文件作为存储构件,在此基础上实现一些分布式操作,从而对外抽象出一套基本文件读写API。
Block
blkid和len
Block是HDFS的文件存储的基本单位,有两个关键属性blkid
和len
,前者用来标识一个操作系统上的文件,并且通过"blk_" + String.valueOf(blkid)
拼接出文件名;后者是该文件以字节为单位的长度。
它抽象出了存储的两个基本维度,起始和大小。变量,数组,文件等等莫不如此。
注册工厂方法
另一个有意思的地方是所有实现Writable接口的类,都注册了一个工厂方法,具体有什么用,以后来补。1
2
3
4
5
6
7static { // register a ctor
WritableFactories.setFactory
(Block.class,
new WritableFactory() {
public Writable newInstance() { return new Block(); }
});
}
序列化
实现Writable
利用Java的序列化接口(DataOutput
),实现Block基本字段的序列化和反序列化。
每个待序列化类单独实现自己一对序列化和反序列化函数,是一个常用的基本设计,我在实习写桌面程序的时候,想将一些控件信息存储为xml,用的想法和这个是相同的,但是做的不好的事没有定义出这个Writable接口作为对这个行为的抽象。
实现了Comparable
(大概是为了被索引时可比较)和Writable
接口
BlockCommand
一个命令(instruction)参数的封装,该命令作用于某个DataNode
下的一系列Blocks;有两种操作,移动这组Blocks到另外一个DataNode
,或者标记改组Blocks为失效状态。
实现
1 | boolean transferBlocks = false; |
用两个标志变量来指明是哪种操作;
用两个数组来存储操作对象。
然后通过构造函数重载,给出了三个构造函数,无参,移动命令或者失效命令。并且提供了各个字段的读权限。
实现了Writable
接口。
总结
对一个简单的命令基本信息的封装,用构造函数接受参数,确定操作类型和操作对象;用标志变量+数组对象来进行实现。
将一组数据按照某种语义捆绑在一起,在函数间传递时也方便,复用性也更好。
LocatedBlock
一个数据对,包含一个Block
和其几个replicate所在的DataNode
的信息。1
2Block b;
DatanodeInfo locs[];
相当于维持某个逻辑Block到其存储位置的指针,用于定位Block物理位置。
实现了Writable
接口。
DataNodeInfo
包含了一个DataNode
的状态信息(总大小,剩余大小,上次更新时间),用名字(自定义的UTF8
存储的host:port
)作为ID,并且维持了其上所有Block
的引用,以查找树(TreeSet
应该是红黑树,以Block
的blkid进行排序)的形式组织。
关键函数
更新状态信息(一次心跳。名字起得好啊——好像DataNode在说,“我还活着,我的基本体征如下,balabala”,传神好记。1
2
3
4
5public void updateHeartbeat(long capacity, long remaining) {
this.capacityBytes = capacity;
this.remainingBytes = remaining;
this.lastUpdate = System.currentTimeMillis();
}
实现了Comparable
和Writable
(比较有意思的是,blocks没有被序列化)接口
DataNodeReport
一个POJO,哈哈,想起这个名字的由来就想笑,马大叔真是有才的别具猥琐。看它的字段就知道,这是心跳来源+心跳信息的一个简单封装,每个字段都具有包级访问权限,还提供了几个public的读方法。1
2
3
4
5String name;
String host;
long capacity;
long remaining;
long lastUpdate;
DataNodeInfo
的ID加心跳信息。
最后有一个toString函数,毕竟是搞报告工作的。
Hadoop-0.1.0代码调试运行
之前雄心勃勃的从GitHub上下了Hadoop源码,想要通读涨涨姿势,甚至想自己写一个简易版本。
不料代码啃起来味同嚼蜡,在读了基本的RPC之后,就此搁置。
后来,抱着没有干不好的事,只有打开方式不对的心态,换个姿势,再学一次。
这次计划如下:
- 首先,将代码在本地调试。
- 然后,按模块进行debug。
今天主要说第一块的探索,花了两个晚上:>。
git fork
下源码,git clone
到本地,通过git tag --list
查看所有标签,找到0.1.0版本。
并且checkout出来。- 配置JAVA_HOME,HADOOP_HOME, PATH环境变量。
- 下载安装ant,并且配置环境变量(ANT_HOME,PATH)
- Eclipse新建Java Project,然后选择Java Project From Existing Ant Buildfile,
从现有文件夹中打开,选择hadoop所在文件夹,它会自动识别出build.xml,然后新建Ant工程。 - 右击build.xml,run as-> ant built,选第二个,进行配置(选对运行文件夹以及build文件,如果
不行就在含build.xml目录中执行命令ant,然后就会生成build文件夹,并且根据conf文件夹模板生成
必要的conf文件。 修改hadoop脚本(在${HADOOP_HOME}/bin里),为了不破坏原来文件,
cp hadoop hadoop-debug
。
然后在此脚本中,将最后一行运行命令,加上一些用于调试的参数。修改如下修改前
1
2# run it
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"修改后
1
2
3# run it
exec "$JAVA" -Xdebug -Xrunjdwp:transport=dt_socket,address=9090,server=y,suspend=y
$JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"-Xdebug指明是调试,-Xrunjdwp引出后面参数。transport:通信方式,address:端口。
- 然后选择一个模块运行,比如NameNode:
bin/hadoop-debug namenode -format
。 - 在Eclipse中选择Debug->Debug configurations->Remote Java Application,选择对工程,localhost
以及对应端口就行。记得在代码中加断点,然后就可以愉快地运行了。
夜深啦,今天先到这里。