香远益清

qtmuniao's blog


  • Home

  • Archives

python 相关

Posted on 2017-11-17

###Mac OSX python 改名
在~/.bash_profile中增加:
alias python=python3

一些设计模式

Posted on 2017-08-03

写程序的时候,规模小,尚不能感觉设计模式的重要性。等规模一上来,需求一迭代,一个应用了恰当设计模式的工程,总能以最小的代价进行最快的迭代。
但是一个奇怪的点是,我总记不住具体的实现所对应的设计模式的名字,但是对他们背后的设计思想,却是念念不忘——依赖于抽象而非具体;对扩展开放,对修改关闭;

Builder

首先,将一个复杂逻辑抽象成一组构建过程(具有前后先后次序,即时序约束)或者一组元操作(便于进行组合实现复杂逻辑),用一个接口封装。
然后,不同的逻辑实体类,继承该接口,进行不同的具体实现。
最后,依赖于接口,组合构建过程或元操作,进行具体业务代码实现。以后想换一个实现,只需要某处换一个具体实现类就行了。

小例子一枚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/* "Product" */
class Pizza {
private String dough = "";
private String sauce = "";
private String topping = "";

public void setDough(String dough) {
this.dough = dough;
}

public void setSauce(String sauce) {
this.sauce = sauce;
}

public void setTopping(String topping) {
this.topping = topping;
}
}

/* "Abstract Builder" */
abstract class PizzaBuilder {
protected Pizza pizza;

public Pizza getPizza() {
return pizza;
}

public void createNewPizzaProduct() {
pizza = new Pizza();
}

public abstract void buildDough();
public abstract void buildSauce();
public abstract void buildTopping();
}

/* "ConcreteBuilder" */
class HawaiianPizzaBuilder extends PizzaBuilder {
public void buildDough() {
pizza.setDough("cross");
}

public void buildSauce() {
pizza.setSauce("mild");
}

public void buildTopping() {
pizza.setTopping("ham+pineapple");
}
}

/* "ConcreteBuilder" */
class SpicyPizzaBuilder extends PizzaBuilder {
public void buildDough() {
pizza.setDough("pan baked");
}

public void buildSauce() {
pizza.setSauce("hot");
}

public void buildTopping() {
pizza.setTopping("pepperoni+salami");
}
}

/* "Director" */
class Waiter {
private PizzaBuilder pizzaBuilder;

public void setPizzaBuilder(PizzaBuilder pb) {
pizzaBuilder = pb;
}

public Pizza getPizza() {
return pizzaBuilder.getPizza();
}

public void constructPizza() {
pizzaBuilder.createNewPizzaProduct();
pizzaBuilder.buildDough();
pizzaBuilder.buildSauce();
pizzaBuilder.buildTopping();
}
}

/* A customer ordering a pizza. */
public class PizzaBuilderDemo {
public static void main(String[] args) {
Waiter waiter = new Waiter();
PizzaBuilder hawaiianPizzabuilder = new HawaiianPizzaBuilder();
PizzaBuilder spicyPizzaBuilder = new SpicyPizzaBuilder();

waiter.setPizzaBuilder( hawaiianPizzabuilder );
waiter.constructPizza();

Pizza pizza = waiter.getPizza();
}
}

这就是一个典型的依赖于抽象而非具体。

Hadoop 源码阅读之DFS(三):FileSystem

Posted on 2017-07-23

FileSystem

FileSystem是一个抽象基类,为LocalFileSystem和DistributedFileSystem提供一些公共方法。通过HashMap:name-> filesystem,维护所有使用的的文件系统,其key或者为“Local”,或者为“Host:Port”(标识一个NameNode)。
继承了Configured类,可以通过配置加载一些基本参数,保存在Configuration中。
为了提高可靠性,给每个文件生成一个校验和,保存在.*.crc的隐藏文件中。

一些有意思的细节

Posted on 2017-07-15

编程中有很多有意思的细节,看到了,就记在这里。

| 简化判断

一堆数按位或,只要有多于一个数为负,则结果为负。

1
2
3
4
5
6
7
8
public void write(byte b[], int off, int len) throws IOException {
if ((off | len | (b.length - (len + off)) | (off + len)) < 0)
throw new IndexOutOfBoundsException();

for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}

from: FilterOutputStream

Hadoop 源码阅读之DFS(二):DataNode

Posted on 2017-07-11

上一篇把一些零碎的小类集在一起,凑成一篇。这篇打算对比较长的一个类DataNode读读。
每个DataNode代表一个数据节点,对应某台机器的一个文件夹,本质上是一定数量的Block的集合,能够和NameNode,client以及其他DataNode进行通信,以对该Block集合进行操作,主要包括client的读和写,其他DataNode block的复制,以及响应NameNode操作,进行删除等操作。
具体实现来说,数据结构上,维持了一个block到byte array的表;执行时,DataNode内部是一个无限循环,不断询问NameNode,报告状态(心跳),执行命令(RPC)。

  1. 状态信息。DataNodeInfo:总大小,剩余大小,上次更新时间。
  2. 执行命令。
    • 客户端读写Blocks
    • 让其他DataNode复制Blocks
    • 删除某些Blocks

此外,DataNode还维持着一个Server Socket以处理来自Client或者其他DataNode请求。DataNode会将其对外暴露的host:port提交给NameNode,后者会将该信息进一步下发给相关的其他DataNode或者client。
(摘自类注释)

StartUp

DataNode启动的时候主要干了以下事情:
为每个dfs.data.dir实例化一个DataNode,DataNode有以下几个重要字段:

  1. namenode,DatanodeProtocol类型,和NameNode进行RPC通信。
  2. data, FSDataset类型,对应一个文件夹,负责具体的本地磁盘操作。
  3. localName, machine name + port,对外暴露的网络机器名和端口。
  4. dataXceiveServer,一个socket Server,监听上述端口,处理读写请求。
  5. 其他一些配置字段,包括blockReportInterval, datanodeStartupPeriod等。

Main Loop When Run

offerService,该函数根据当前时间与上次动作时间差值,决定是否再一次执行该动作(DataNode对NameNode的RPC);这几个动作基本对应DataNodeProtocol的各个函数,即RPC的几个动作约定。这些事件有向NameNode:

  • 发送心跳信息
  • 上传block信息
  • 获取NameNode指令

下面分别就每一项进行详细说明:

1. 发送心跳信息

心跳信息包括以下几项内容:

  • DataNode名字
  • DataNode数据传输端口
  • DataNode总容量
  • DataNode剩余字节数

2. 上传当前Block信息

报告本DataNode的所有Block信息,以更新表machine->block list 和表block->machine list。利用TreeMap实现,能得到按BlockId排序的数组,通过逐一比较新旧上报Block数组的每个元素(oldReport和newReport),利用removeStoredBlock和addStoredBlock将旧数组更新为新数组。

然后NameNode将需要删除的Block数组返回,利用data(FSDataSet)句柄进行删除。

3. 报告新收到的Block信息,即ReceivedBlock

当Client写数据,或者其他DataNode复制数据给当前DataNode的时候,该DataNode通过RPC,执行此函数。然后NameNode将其更新到保存元数据的table里。

4. 获取 NameNode指令

根据BlockCommand类的字段:

1
2
3
4
boolean transferBlocks = false;
boolean invalidateBlocks = false;
Block blocks[];
DatanodeInfo targets[][];

可以看出,指令动作包括交换(transfer)和删除(delete or invalidate);动作对象包括一系列blocks和DataNode,表示将blocks[i]传送到targets[i][0]… targets[i][j]的DataNode上去。
具体传送实现,为每一个!invalid的block,启动一个线程,负责具体数据传送,代码为:

1
new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();

后面将对DataTransfer类进行详细注解。

DataTransfer

该类实现了Runnable接口,在每次有数据需要传输时被启动;其动作主要为:

  1. 连接第一个Target DataNode的socket,作为输出。
  2. 从FSDataSet中获取Block元信息以及本机器上该block对应的数据文件,作为输入。
  3. 从输入端读取数据,写到输出端。

因此,该类只负责将block信息写到第一个target DataNode,比如说Node1,剩下的将由Node1机器上的线程进行数据传送。

该block在本机实际的文件夹路径和文件名都可以根据blockId进行确定。对于一个64bit的blockId,从高位到地位,每四位作为一个文件夹的名字(0~15),进行路由,因此文件实际位置的深度可能高达64/4=16层;存储数据的文件命名方式为blk_{blockId}.

DataXceiveServer

该类也实现了Runnable接口,在DataNode初始化的时候被启动,用于监听Client或者其他DataNodes的请求,以进行block数据的传输。
具体实现为,使用SocketServer,根据信号shouldListen来循环监听所有请求。当请求到来时,使用DataXceiver类进行具体连接的处理;

DataXceiver

该类负责具体实现数据传输的逻辑,包括Block的写和读,每次传输一个Block块,将该Block首先写入本地文件系统,然后传送给下一个目标DataNode;具体来说,
首先,打开socket输入流,读取首字节,判断操作类型;
然后,进行写或者读操作。

写操作(OP_WRITE_BLOCK)

  1. 读入header,包括以下几个字段

    1
    2
    3
    4
    5
    6
    a. shouldReportBlock --> bool
    b. block info(blkid+len) --> Block
    c. numTargets --> int
    d. targets --> DatanodeInfo[]
    e. encodingType --> byte
    f. data length --> long
  2. 然后将这些header信息,去掉该DataNode(targets[0])的信息后,写入下一个DataNode (target[1])。

  3. 从socket中读取具体存储的数据,先后写入本地存储(当前DataNode)和下一个DataNode的socket。这里有一点设计,就是如果写Socket异常后,可以终止Socket,但仍然继续写本地存储。
  4. encodingType的类型不同,读取数据方式不同:对于RUNLENGTH_ENCODING类型,其结构是length(say l)+data(of the length l),因此读一次就结束;而CHUNKED_ENCODING类型,结构为l1 + data1 + l2 + data2 + … + ln + datan + l(n+1) (=0);因此需要循环继续读如长度,然后读入该长度数据,直到len=0结束。
  5. 如果和下一个DataNode间的socket仍然正常,则从该socket读回一些关于写数据的反馈,包括long型的结束符和LocatedBlock–>写成功后的block网络位置,是一个Block和DatanodeInfo[]对,表示该Block以及已经写成功的DataNode list。整个写操作和备份的过程类似于一个递归调用的过程,由client写datanode1, 然后datanode1写datanode2,然后datanode2写datanode3;然后datanode3将写成功信号,以及datanode3位置告诉datanode2,然后datanode2将写成功信号以及datanode2,datanode3位置告诉datanode1等等。

读操作(OP_READ_BLOCK || OP_READSKIP_BLOCK)

首先读入待读取的Block信息,然后,如果是OP_READSKIP_BLOCK类型,则读取需要跳过的字节数(toSkip–>long);
然后通过data –> FSDataSet 定位block本地存储文件位置,根据类型决定是否跳过特定字节(toSkip),然后逐字节读取该文件。

Aside info

如果类需要作为Key,比如TreeMap,则需要实现Comparable接口,只有可以比较才能进行排序和Hash;如果需要进行序列化和反序列化,则需要实现Writable接口。

Hadoop 源码阅读之DFS(一):一些基本的类

Posted on 2017-07-02 | Edited on 2017-07-03

计划花一个月左右的时间,通读一遍Hadoop 0.1.0的源码,尽量少写一些废话,多记录一些思考。

Random一下,就从分布式文件系统(DFS)开始吧。
DFS即分布式文件系统,集合多台机器存储在预定义位置上的一组文件作为存储构件,在此基础上实现一些分布式操作,从而对外抽象出一套基本文件读写API。

Block


blkid和len

Block是HDFS的文件存储的基本单位,有两个关键属性blkid 和len,前者用来标识一个操作系统上的文件,并且通过"blk_" + String.valueOf(blkid)拼接出文件名;后者是该文件以字节为单位的长度。
它抽象出了存储的两个基本维度,起始和大小。变量,数组,文件等等莫不如此。

注册工厂方法

另一个有意思的地方是所有实现Writable接口的类,都注册了一个工厂方法,具体有什么用,以后来补。

1
2
3
4
5
6
7
static {                                      // register a ctor
WritableFactories.setFactory
(Block.class,
new WritableFactory() {
public Writable newInstance() { return new Block(); }
});
}

序列化

实现Writable利用Java的序列化接口(DataOutput),实现Block基本字段的序列化和反序列化。
每个待序列化类单独实现自己一对序列化和反序列化函数,是一个常用的基本设计,我在实习写桌面程序的时候,想将一些控件信息存储为xml,用的想法和这个是相同的,但是做的不好的事没有定义出这个Writable接口作为对这个行为的抽象。

实现了Comparable(大概是为了被索引时可比较)和Writable接口

BlockCommand


一个命令(instruction)参数的封装,该命令作用于某个DataNode下的一系列Blocks;有两种操作,移动这组Blocks到另外一个DataNode,或者标记改组Blocks为失效状态。

实现

1
2
3
4
boolean transferBlocks = false;
boolean invalidateBlocks = false;
Block blocks[];
DatanodeInfo targets[][];

用两个标志变量来指明是哪种操作;
用两个数组来存储操作对象。

然后通过构造函数重载,给出了三个构造函数,无参,移动命令或者失效命令。并且提供了各个字段的读权限。

实现了Writable接口。

总结

对一个简单的命令基本信息的封装,用构造函数接受参数,确定操作类型和操作对象;用标志变量+数组对象来进行实现。
将一组数据按照某种语义捆绑在一起,在函数间传递时也方便,复用性也更好。

LocatedBlock


一个数据对,包含一个Block和其几个replicate所在的DataNode的信息。

1
2
Block b;
DatanodeInfo locs[];

相当于维持某个逻辑Block到其存储位置的指针,用于定位Block物理位置。

实现了Writable接口。

DataNodeInfo


包含了一个DataNode的状态信息(总大小,剩余大小,上次更新时间),用名字(自定义的UTF8存储的host:port)作为ID,并且维持了其上所有Block的引用,以查找树(TreeSet应该是红黑树,以Block的blkid进行排序)的形式组织。

关键函数

更新状态信息(一次心跳。名字起得好啊——好像DataNode在说,“我还活着,我的基本体征如下,balabala”,传神好记。

1
2
3
4
5
public void updateHeartbeat(long capacity, long remaining) {
this.capacityBytes = capacity;
this.remainingBytes = remaining;
this.lastUpdate = System.currentTimeMillis();
}

实现了Comparable和Writable(比较有意思的是,blocks没有被序列化)接口

DataNodeReport


一个POJO,哈哈,想起这个名字的由来就想笑,马大叔真是有才的别具猥琐。看它的字段就知道,这是心跳来源+心跳信息的一个简单封装,每个字段都具有包级访问权限,还提供了几个public的读方法。

1
2
3
4
5
String name;
String host;
long capacity;
long remaining;
long lastUpdate;

DataNodeInfo的ID加心跳信息。
最后有一个toString函数,毕竟是搞报告工作的。

Hadoop-0.1.0代码调试运行

Posted on 2017-06-29 | Edited on 2017-06-27

之前雄心勃勃的从GitHub上下了Hadoop源码,想要通读涨涨姿势,甚至想自己写一个简易版本。
不料代码啃起来味同嚼蜡,在读了基本的RPC之后,就此搁置。

后来,抱着没有干不好的事,只有打开方式不对的心态,换个姿势,再学一次。
这次计划如下:

  1. 首先,将代码在本地调试。
  2. 然后,按模块进行debug。

今天主要说第一块的探索,花了两个晚上:>。

  • git fork下源码,git clone到本地,通过git tag --list 查看所有标签,找到0.1.0版本。
    并且checkout出来。
  • 配置JAVA_HOME,HADOOP_HOME, PATH环境变量。
  • 下载安装ant,并且配置环境变量(ANT_HOME,PATH)
  • Eclipse新建Java Project,然后选择Java Project From Existing Ant Buildfile,
    从现有文件夹中打开,选择hadoop所在文件夹,它会自动识别出build.xml,然后新建Ant工程。
  • 右击build.xml,run as-> ant built,选第二个,进行配置(选对运行文件夹以及build文件,如果
    不行就在含build.xml目录中执行命令ant,然后就会生成build文件夹,并且根据conf文件夹模板生成
    必要的conf文件。
  • 修改hadoop脚本(在${HADOOP_HOME}/bin里),为了不破坏原来文件,cp hadoop hadoop-debug。
    然后在此脚本中,将最后一行运行命令,加上一些用于调试的参数。修改如下

    • 修改前

      1
      2
      # run it
      exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"
    • 修改后

      1
      2
      3
      # run it
      exec "$JAVA" -Xdebug -Xrunjdwp:transport=dt_socket,address=9090,server=y,suspend=y
      $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"
    • -Xdebug指明是调试,-Xrunjdwp引出后面参数。transport:通信方式,address:端口。

  • 然后选择一个模块运行,比如NameNode:bin/hadoop-debug namenode -format。
  • 在Eclipse中选择Debug->Debug configurations->Remote Java Application,选择对工程,localhost
    以及对应端口就行。记得在代码中加断点,然后就可以愉快地运行了。

夜深啦,今天先到这里。

12

qingtengmuniao

programme, python, java, big data

17 posts
5 tags
© 2019 qingtengmuniao
Powered by Hexo v3.8.0
|
Theme – NexT.Gemini v7.0.0