sec1
this is just a test body
use next theme
qtmuniao's blog
Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.
1 | $ hexo new "My New Post" |
More info: Writing
1 | $ hexo server |
More info: Server
1 | $ hexo generate |
More info: Generating
1 | $ hexo deploy |
More info: Deployment
上一次在做完 lab2a 即 raft 的 leader 选举之后,一直卡在日志同步这一块(log replication);直到昨晚进行了一下 appendEntries 的优化(prevLog 不匹配时,一下跳过本 term 所有 logEntries),一直困扰的 TestBackup2B 竟然神奇 Passed 的了。跑了两遍还不大信,特地将其改回去,看到果然 Fail 才放心下来,看来是效率太低超时了。
趁着还新鲜,索性今晚就将这一段时间的血泪史记下来吧。
6.824是MIT的一门分布式课程,我跟的是2018 spring 。在第二个实验中要求简单实现一个分布式一致性协议–raft。
这是一个专为方便教学和工程实现所设计的协议,它将协议拆解为几个相对独立的模块–leader选举,日志同步,安全保证。论文里图二基本给出了Raft的所有实现细节,可谓字字珠玑。但也因为太微言大义了,导致有些状态转换分散在不同描述中,假如你真只照着这幅图实现,很容易遗漏些细节。
这是lab2B的内容,又是反反复复,猜坑,排雷,放弃,捡起一月余才将日志同步这一块完成,最终代码不到七百行(694),其间有些实现细节还受过网上的启发。其间状态类似于网上看到的一幅调侃程序猿的漫画——啊,出bug了,为什么啊,改改改;还是不对,改改改;啊终于对了,可是,又是为什么啊?
最终 Passed 版本我觉得还待改进,有些地方隐隐然感觉略多余。但暂时先高兴会,把心得赶到纸上来,毕竟老年人脑袋的内存实在有限,而且易挥发。
上一篇 leader 选举中主要是对实现技巧做了些记录,这一篇主要是对实现各个逻辑细节进行探讨。
当初论文大概看明白了,觉得各个细节都搞懂了,但是整个流程在脑中还是转不起来。现在想来,Leader上线后,日志同步过程大概是这样:
nextIndex = len(rf.log)
,所有 matchIndex = 0
prevLogIndex+prevLogTerm
匹配上 Follower 中的某个 logEntry
,然后 Leader 将匹配到的 logEntry
之后的 entries 一次性送给 Follower。prevLogIndex
,prevLogTerm
和 entires
。其中 prevLogTerm
又由 prevLogIndex
决定,因此主要考虑 prevLogIndex
和 entries
。对于 prevLogIndex
,在 试探阶段 我将其定为 nextIndex-1
,这么做是为了在试探时降低不必要的 entries
传输;在传送阶段,可以将 prevLogIndex
定为 matchIndex
,也即已经匹配上的最后一条数据。对于 entries
,即取 Leader log 中 [prevIndex+1, min(nextIndex, len(rf.log)-1)]
的一个闭区间。prevLogIndex
= nextIndex
-1),在传送阶段可以一次性的将所有匹配之后的 Leader 上的 logEntries 全部传送过去。如果没有新日志了,几个变量将维持在:nextIndex = len(rf.log);prevLogIndex = matchIndex = len(rf.log)-1
,entries
为 [];其他点就是附着于此流程之上的一些细节:
commitIndex
,并且在以后的 AppendEntries
的 RPC 中将其同步给各个 Follower。lastApplied
是否跟上了 commitIndex
,保证将已提交 log 及时应用到状态机中。这两个变量分开我觉得主要是为了逻辑上的解耦——在 Leader 上是 Leader 主动检测来更新 commitIndex
的,在 Follower 上,是被动接受 Leader 消息来更新的。matchIndex
数组中本身对应的位置更新,因为它本身也是最后计算大多数时的一票。主要对应上面说的两个阶段,试探阶段和传送阶段;其他要注意的就是上一篇提到的加锁和状态自检。
1 | // need handle the reply |
然后就是参数构造,对于 prevIndex
来说,也是两个阶段不同构造。然后取合适窗口的 entries
。
1 | func (rf *Raft) constructAppendEntriesArg(idx int) *AppendEntriesArgs { |
最后 AppendLogEntries 回调,看到先前 term 的请求,直接拒绝,这没什么好说的,注意将自己的 term 带回去就行。此外,无论 args.Term > or = rf.currentTerm ,收到 AppendEntries 的 peer 都要变成 Follower,并且重置 timer。此外在匹配 logEntry 的时候,要注意进行截断。
1 | func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { |
主要依赖 leader 的 matchIndex[] ,具体做法是将其升序排序,然后取中位数位置的 Index(当时拍脑袋想出来的,感觉很神奇)。另一个要注意的点是论文中着重强调的,就是只能提交本 Term 中的 logEntry,这是为了 Leader 你方唱罢我方登场引起的反复覆盖。
1 | func (rf *Raft) checkCommitIndex() { |
每个 peer 在每个 term 最多投一票,但是每次更新 term 后,可将 votedFor 赋值为 -1,即又可以投票了。这个 case 发生在响应 candidate 要求投票时,如果他已经投了票,这是好像不能投票,但是发现 term 没有 candidate 的大,那么需要立即变为 Follower ,并且给该 candidate 投票。
1 | func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { |
这里涉及到 becomeFollower 的实现,即更改状态,然后重设 timer,并且根据 term 是否更新来决定是否可以投票。
1 | func (rf *Raft) becomeFollower(term int) { |
记录下在实现6.824 lab2 raft 的一些想法和经验,聊以备忘。
6.824是MIT的一门分布式课程,我跟的是2018 spring 。在第二个实验中要求简单实现一个分布式一致性协议–raft。
这是一个专为方便教学和工程实现所设计的协议,它将协议拆解为几个相对独立的模块–leader选举,log复制,安全保证。论文里图二基本给出了Raft的所有实现细节,可谓字字珠玑。但也因为太微言大义了,导致有些状态转换分散在不同描述中,假如你真只照着这幅图实现,很容易遗漏些细节。
这是lab2A的内容,说来惭愧,从开始构思到测试用例pass,前前后后拖了两个多月。虽然只是晚上和周末写写,但是也的确进展缓慢,不过收获颇多。暗想要是本科实验也这么来,可能就真如知乎所说,一学期顶多学两门课,回想大三时候四门四学分的课,从课程设计上来说就是注定要我们划水的。
吐槽完毕,说说踩过的坑。
raft主要有两个event loop,一个是(Follower,Candidate)超时发起选举,一个是(Leader)定期心跳(有时捎带日志同步)。最容易想到的就是本科写大作业无脑用的loop+sleep,即外层一个while true,内层用一个稍小(但比electionTimeout和heartbeatInterval至少小一个数量级才够用)的时间间隔(比如说t)sleep,来周期性检测时间节点(needElection,heartbeat)的到来。
但是我老强迫症的觉得,这么着不准确,误差至少是那个检测时间间隔t。这要是好多线程搞来搞去,面对这么复杂的状态变化,会不会由误差而导致错误,但是用go的timer好像很复杂的样子,每每想到这就想不清了,这么着也耽搁了一阵子。
直到后来,在另一个也在做raft的孩子提醒下才注意到,其实课程很贴心的给出了建议–还就是利用loop+sleep来周期性检测超时。这时候我茅塞顿开,悟出了上面括弧中我的注释–只要保证检测间隔小于超时间隔一两个数量级,基本上就没啥问题。这种实现的优点是简单,粗暴,直接,可控。
我在实现的时候又突然觉得好玩,弄成了两种稍有不同的实现,下附代码,为了看着清楚,都做了些简略化处理。
一种是只有一个loop:
1 | go func() { |
另一种是两个嵌套loop,内层loop专门用来等待(或许是联想到了CPU的忙等待)。
1 | for { |
虽然前者更简洁,但是后者逻辑更明白;有时候简洁的代码是复用了不同的逻辑,导致语义上可能稍微有点不清楚。考虑到代码的第一要义是给人看的(啥?你说是给机器看的?咳咳,我觉得这是代码之所以为代码的前提,不然编译器不给你过啊),我觉得还是第后者好一些。
课程也很贴心的给出了提示,但作为一个从大一电梯大作业开始就开始‘玩锁’的人,我直接将其略过,不管三七二十一,昂首挺胸,赤膊上阵,狂撸代码。然而,记忆是会骗人的,教训是很惨痛的,竟然一直死锁而我却不知道是哪的问题。后来只得乖乖将提示看了好几遍,发现写的真是好啊。。。
其要点总结起来,就是将有全局变量读写的地方先全部给加上锁,然后再将有阻塞(rpc call等等)的地方去掉锁。
它这原则先按下不表,来说说我跳坑的两个地方:
犯这个错误的原因是咱看着那么长的代码,凭着经验,总得给他包一下吧,包的时候发现我擦全是全局变量,赶紧申请锁啊。但是后来在调用该函数的时候,忘了放在临界区了;也就是说吃着碗里的,又想盛锅里的。死锁成就1 get,废话不多讲,上代码:
1 | a. |
break,continue,return
这种偷偷摸摸的提前结束分支的行为,虽然我们平时干的很开心,但是它不符合人的既定对称认知啊,就导致有时候忘了处理它,什么叫不对称呢?上代码:
1 | if !check(arg) { |
v.s.
1 | if !check(arg) { |
如果是后者,对着if else
扫一眼对齐,函数有几个出口,一目了然。然而对于前者来说,如果分支语句淹没在巨量代码中,就很容易忘记某个猥琐角落还藏着一个出口,自然就不会上锁。
具体到我的情况,就是在candidate要票的时候,如果得到多数票,就直接变为leader;如果后面仍有人给票,我会判断当前身份是否已经是leader,如果是就直接返回,不在意这些票了,的确好邪恶。。于是,你懂得,返回前忘还回锁了。代码如下,有删节。
1 | go func(server int, args RequestVoteArgs) { |
由于状态是在多个loop(好吧,也就两个,但是为啥觉得很多呢)中来回改变的,因此在异步/阻塞调用前后,可能身份角色(自身)和选举周期(外界)早已天翻地覆,因此需要进行自检。
说来惭愧,这个也是在多方查看资料后才意识到的。
Leader进行心跳检测时候,由于是异步调用,所以需要先检测自己还是不是leader,以及是否还在自己任期(由args保存了当时的任期);当rpc返回后,再一次进行该检查,无误,才能按照自己是leader来进行下一步动作。
1 | go func(server int, args *AppendEntriesArgs) { |
最近在看些计算框架的基础知识,为了比较好的了解Spark原理,我打算将其基石–RDD这篇paper粗翻一下,水平所限,不当之处,欢迎指出。
我们提出了一种分布式弹性数据集的抽象(Resilient Distributed Datasets, RDDs),这种分布式的内存抽象能够让编程者使用大规模集群机器的内存进行计算,并且提供容错支持。RDD的提出是为了解决现有计算框架对两种计算模式:迭代式算法和交互式挖掘的不友好支持。在上述两种计算模式中,将数据保存在内存里都能极大的提高计算性能。为了更好的处理容错,RDDs提供严格的(译注:或者说简化的)内存共享方式,该方式只支持粗粒度的数据变换而非传统(译注:数据库)对共享状态以任意粒度的更新。然而,我们可以看到对于相当广泛的计算类型,RDDs的表达能力都是足够的。比如Pregel,一种专门为迭代式计算开发的模型,以及其他这些模型不能覆盖的类型,RDDs都能支持。我们在一个叫Spark的系统中实现了RDDs,利用该系统,我们对用户的各种应用场景和基准测试进行了评估。
像MapReduce[10]和Dryad[19]这样的集群计算框架已经被广泛应用于大规模的数据分析中。这些系统让用户免于关心底层多机和容错细节,仅利用一些抽象的高阶算子就能写出并行计算程序。
尽管现有的框架提出了各种访问集群计算资源的抽象,他们都缺少对利用分布式内存的抽象。这使得他们不能够高效的满足近期出现的很重要的一类应用对于多轮计算中中间结果重用的需求。数据重用在一些迭代式的机器学习和图算法应用中很普遍,包括PageRank算法,K-means聚类和逻辑回归。另一类迫切的使用场景是交互式数据挖掘算法。在此场景下,用户常常对同一个数据集进行多轮次的即席查询。不幸之处在于,在大多数现有框架中,多轮计算(比如两轮的MapReduce任务)中间重用数据的唯一方式就是写到落地的存储系统中,比如分布式的文件系统。这样做会导致巨额的系统过载,因为数据备份,硬盘IO和序列化都会占用绝大比例的应用运行时间。
意识到这些问题后,研究员们对一些需要数据重用的应用开发出了专用的框架。举例来说,Pregel[22]是一个交互式的图计算框架,它将中间数据保存在内存里;Haloop[7]提供一个迭代式的MapReduce接口。然而这些框架仅支持一些专用的计算模式(例如多步骤的MapReduce迭代),并且只在这些模式下进行隐式的数据共享。也就是说,它们并没有针对通用的计算模型提供一种有效的抽象,譬如让用户在加载多种数据集到内存后进行跨数据集间即席查询。
于本论文中,我们提出了一种新的抽象,唤做可伸缩分布式数据集(Resilient Distributed Datasets, RDDs),藉此,使得一大类应用可以高效的进行数据重用。RDDs是一种通用的容错,并行的数据结构;它让用户对中间结果进行显式的内存驻留,自定义的分片置放,以及多样的算子操作。
设计RDDs时一个首要挑战就是如何定义编程接口使得容错以高效。现存的对于集群内存的抽象,如分布式共享内存[24],kv存储[25],数据库和Piccolo[27],都提供对可变状态的细粒度的更新(如表中的字段)。使用这种抽象,提供容错的方式就仅限于多机冗余备份和日志记录更新。对于数据密集型应用来说,不管哪种方式都是代价高昂。盖因其要求在集群网路间进行大量数据复制,而网络带宽远逊于RAM,由是招致客观的存储系统过载。
与上述系统不同的是,RDDs提供一种基于粗粒度数据变换(如map,filter和join)的接口,这种变换对多个数据项执行同样的操作。此举将提供容错的方式变为通过记录变换过程而非备份数据本身来进行数据集(lineage)的重建。假使RDD的某个部分(partition)丢失了,RDD可以通过充分变换信息以及变换源集来进行该部分的重建。由是,遗失的数据通常可以很快的恢复,省去了冗余备份。
尽管第一眼看上去,一个只提供粗粒度数据变换的接口具有诸多局限性,RDDs却能很好地适应多种并行应用,因为这些应用本质上都是作用于多个数据项上面的相同运算。事实上,我们发现RDDs可以高效的满足多个现有框架提出的模型,包括MapReduce,DryadLINQ,SQL,Pregel和HaLoop;并且能够满足这些框架没有覆盖的应用需求,如交互式的数据挖掘。我们认为,RDD抽象强大的最有力的证据就是,它能满足之前只有通过引入新框架才能提供的能力。
我们在一个叫Spark的系统里实现了RDD,它被用于UC伯克利大学和一些公司的实验和生产环境中。Spark使用Scala语言[2]提供了一套类似于DryadLINQ[31]的语言整合编程接口。除此之外,Spark支持通过Scala解释器进行交互式的大数据集上的查询。我们深信Spark是第一款在大规模集群上支持通用编程语言以实时交互的速度进行内存上的数据挖掘系统。
我们通过对于用户应用的小型基准评估来测试RDDs和Spark的性能。发现Spark在迭代式应用上快于Hadoop将近二十倍,将一个实际的数据分析加速了将近四十倍,并且能够在1TB数据集上以5-7s的延迟进行交互式的遍历。更重要的是,为了证明RDDs的通用性,我们在Spark上实现了以一个相当小的库(每库仅有两百多行)实现了Pregel和HaLoop编程模型,包括它们背后的存储位置的优化。
本论文首先是RDD(第二节)和Spark(第三节)的概览,然后讨论了RDDs的内部表示(第四节),实现细节(第五节)和实验结果(第六节)。最后我们探讨了RDDs是如何获取了几个当前的集群编程模型(第七节),此外还有相关工作(第八节)和结论。
这一部分提供了RDDs的概览。首先我们定义RDDs(2.1小节)并且介绍了在Spark中它们的编程接口(2.2小节)。接下来对比了RDDs和细力度的共享内存抽象。最后,讨论了RDD模型的限制。
正式来说,一个RDD是一个只读的,分片的数据记录集。RDDs是只能由(1) 落地数据 或(2)其他RDDs 通过确定性的运算得来。这些运算被称为变换以区别于RDDs之上的其他运算。这些变换操作包括map,filter和join。
RDDs不需要随时被计算出来。相反,一个RDD有足够的信息记录了从其他数据集的变换序列以从落地储存得到它的每个部分的信息。这是一个优秀的性质:本质上,任何一个程序都不能引用一个不能从错误中重建的RDD。
最后,用户可以控制RDDs两个其他方面的操作:持久化(persistence,译注:我觉得缓存好像更合适)和分片选择(partitioning)。用户可以指定某个RDDs为将来要重用的,并且选择一种存储策略(如,存在内存中)。用户也可以通过指定RDD元素的key(译注:以及基于该key的hash算法)来让RDD的元素在多个机器上进行分片存储。这对于存储位置优化很有用,譬如,可以让待join的两个数据集以同样的策略进行哈希和分片(译注:即路由到同一台机器上)。
Spark通过像DyradLINQ[31]和FlumeJava[8]等语言整合的API对外提供RDDs服务,在其中,我们将每个数据集(dataset)表示为对象(object),将变换(transformations)表示为作用于这些对象上面的函数调用。
(译注:一个典型的流程是这样的)首先(start by),编程者定义一些作用于离线存储(stable storage)上的变换(如map和filter);然后,编程者可以将动作(actions)加诸于RDDs上。动作是一种可以将数据返回应用或者导出到存储系统的操作(译注:有点像终结符)。动作范畴的例子包括count(返回数据集内元素的个数),collect(返回数据集中所有元素本身)和save(将数据集中元素导入某个存储系统)。和DryadLINQ议案该,Spark会将所有对RDDs变换(transformations)操作延迟到第一次调用动作(actions)调用时,所以Spark能够将这些变换进行流水式编排(pipeline transformations)。
此外,编程者可以调用persist函数,来声明将来运算中需要重用的RDDs。Spark默认将RDDs缓存在内存里,但当内存不够的时候,也会将RDDs刷到硬盘里。用户也可以通过改变persist函数的参数来指定其他缓存(译注:虽然原文是persist)策略,比如只存在硬盘里或者多机备份。最后,用户可以对不同RDD的缓存指定优先级,以确定其内存数据刷到硬盘上的先后顺序。
设想这样一个场景,一个网络服务出现了一些问题,运维人员想在存储于HDFS的数T级别的日志中找到原因。使用Spark,运维人员可以只将日志中的错误信息加载到多机内存里,并且进行交互式的查询。她首先需要编以下Scala代码:
1 | lines = spark.textFile("hdfs://...") |
第一行定义了一个从HDFS文件(可以认为是一行行日志的集合)中加载的RDD,第二行从该RDD通filter变换生成一个新RDD。第三行指定errors
缓存在内存里以便为多次查询共享数据。需要注意的时候,传递给filter的参数是Scala语法中的闭包。
到现在为止,集群上没有执行任何运算(译注:因为只有action才能触发变换执行)。而,这时候用户可以在动作(actions)中使用该RDD,如统计日志条目的数量:
1 | errors.count() |
用户也可以在此RDD上执行进一步的变换,并且使用输出结果,如下面几行代码:
1 | // Count errors mentioning MySQL: |
当第一次涉及到errors
数据集的动作执行的时候,Spark将errors
的分片存到内存里,从而大大加速后续作用于该数据集的运算。注意到原始的RDD,lines
没有被加载到RAM里,这是我们所期望的,因为错误信息日志可能只是整个数据的一小部分(足以在内存装得下)。
最后,为了说明我们的模型进行容错的过程,我们在图1里给出了第三个查询操作的运算谱系图。在该查询操作中,我们从lines
filter得到errors
,然后继续filter和map后进行collect。Spark的调度器将会对后两个变换运算进行流水化处理,然后分派一队任务到缓存有errors
不同分片的节点上执行具体运算逻辑。并且,当errors
的某个分片丢失时,Spark只需要在对应的源lines
分片进行相应的运算就能重建它。
为了理解RDDs作为一个分布式的内存抽象带来的好处,我们在表1种将它和分布式共享内存(DSM)进行了比较。
Aspect | RDDs | Distr. Shared Mem. |
---|---|---|
Reads | Coarse- or fine-grained | Fine-grained |
Writes | Coarse-grained | Fine-grained |
Consistency | Trivial (immutable) | Up to app/runtime |
Fault recovery | Fine-grained and low-overhead using lineage | Requires checkpoints and program rollback |
Straggler mitigation | Possible using backup tasks | Difficult |
Work placement | Automatic based on data locality | Up to app (runtimes aim for transparency) |
Behavior if not enough RAM | Similar to existing data flow systems | Poor performance (swapping?) |
在DSM系统中,应用可以在全局地址空间中对任意的位置进行读写。需要指出的是,在这种定义范畴下(DSM),不仅涵盖了经典的共享内存系统[24],也包括了其他对共享状态进行细粒度读写的系统,包括但不限于Piccolo[27]。Piccolo提供了一个共享的DHT和分布式的数据库。DSM是一个很通用的抽象,但是它这种这种普适性也让其很难以一种高效并且容错的方式在大规模集群上进行实现。
RDDs和DSM见最主要的区别在于,RDDs只允许通过粗粒度的变换来实现写操作,而DSM允许对任意的内存进行随机读写。RDDs的这种设定给应用的批量写操作带来了一定限制,但也让它更高效的进行容错处理。还有,RDDs省去了由于引入备份点(checkpoints)而带来的额外负载,因为它可以通过操作谱系(lineage)进行错误恢复(4)
。尤其是,只有某些丢失的RDD分区遇到错误时需要进行重新计算,并且不同分区(译注:由于在不同机器/节点上)可以进行并行的计算,而不用将整个运算(译注:即所有分区)进行回滚。
RDDs所带来的另外一个主要的优势在于,RDD的不可变性让系统可以像MapReduce[10]一样为运算缓慢的节点(拖后腿的那些机器)启动一些备份任务。而在DSM中就很难有这种骚操作了,因为备份任务和原任务可能会对同一个内存位置进行操作,造成互相干涉。
最后来说,RDDs还提供了两个DSM所没有的好处。第一个,对于RDDs上的批量操作,可以动态调度任务以充分利用局部性原来,从而提高性能。另一个,RDDs在内存不足的情况下可以进行平滑的降级,只要这些RDD仅被用于遍历性质的操作。那些不能够完全载入内存的分区可以被存在硬盘上,并且在现有的数据并行系统中可以提供和原来差不多的性能(译注:为啥?莫非是觉得硬盘的连续读比较快?)。
正如引言部分所讨论的,RDDs很适用于对同一个数据集的所有元素进行同种运算的应用。在这种情况下,RDD可以将每个变换记为运算谱系图中的一部,并因此可以将丢失的分片进行高效的恢复,而不用去做大量的备份数据。RDDs不大适合那种需要对共享数据进行细粒度地更新的应用,比如网站应用和增量式的网络爬虫应用中的存储系统。对于这类应用来说,传统的基于更新日志和备份点恢复的系统更为高效,比如说数据库系统,RAMCloud[25],Percolator[26]和Piccolo[27]。我们旨在为这种批量式分析需求提供一种高效的模型,将这些需要异步更新共享数据的需求留给其他专有系统去完成吧。
Spark在Scala里通过类似于DryadLINQ[31]语言整合API提供了RDD抽象,其中Scala是基于Java VM的一门静态类型的函数式编程语言。我们选择Scala是因为它兼具简洁性(正好适合交互式应用)和高效性(由于是静态类型)。当然了,并不是说RDD抽象要求必须选用一门函数式语言。
为了使用Spark,开发者需要编写driver程序以连接集群中的workers,细节如图2所示。driver定义了一个或多个RDDs,并将actions加诸于上。driver上的Spark代码也会追踪RDDs的变换谱系。workers是能够在多个运算间在内存中存储RDD分区的长生命周期的进程。
如我们在2.2.1小结中举的日志分析的例子一样,用户通过给运算(如map)传递闭包(函数名字)来提供参数。Scala将每个闭包表示为一个Java对象,这些对象可以被序列化并且通过网络传递,并被其他节点加载。Scala将所有闭包关联的变量作为Java对象的字段来存储。例如,可以写像var x = 5; rdd.map(_ + x)
这样的代码来为某个RDD中的所有元素的值增加5。
RDDs本身也是某个元素类型的参数化的静态类型对象。例如,RDD[Int]是一个整形RDD。不过,由于Scala支持类型推断,我们举的大部分例子都省略了类型。
尽管在Scala里提供RDD抽象的方法在概念上很简洁,我们却不得不使用反射来小心处理Scala的闭包所带来的问题[33]。我们需要做一些额外的工作以使Spark在Scala的解释器中可用,这部分我们将在5.2节中深入讨论。虽然如此,我们倒不必修改Scala的编译器。
表2列出了Spark中主要可用的变换和动作。我们给出了每个运算的函数签名,并在方括号里指明了类型。回想一下,变换,是一种延迟式运算,用以生成新的RDD,而动作会触发真正的运算,并将结果值返回给应用程序或者将数据写到外部存储。
需要注意的是,有些操作,如join,只能作用于包含kv对的RDD上。此外,我们适当地选择函数名益脾胃Scala或者其他函数式语言中的其他API。比如,map是一到一的单射,而flatMap将每个输入值映射为一到多个输出(类似于MapReduce中的map)。
除了这些算子之外,用户还可将RDD进行缓存(persist)。更进一步,用户可以获得一个RDD的分片顺序,它是由Partitioner类来表达的,并据此能够对其他数据集的分片。像groupByKey,reduceByKey和sort一类运算能够自然引起RDD进行分片和聚集。
除2.2.1节的例子外,我们补充两个其他数据分析的例子,他们都是迭代式应用:逻辑回归和PageRank。后者说明了,如何控制分片以提高性能。
很多机器学习算法本质上是迭代式的,是因为他们通过迭代过程对函数进行函数的最优化,比如梯度下降。这些算法如果将数据保存在内存里,能够大大加快运行速度。
试举一例,下面的程序实现了逻辑回归[14],一个常见的分类算法,即寻找一个超平面w以尽可能的分开两组点(如垃圾邮件和正常邮件)。算法使用梯度下降策略:从一个随机值w开始,然后进行迭代,每次加上一个基于所有数据和w的函数值,以让w向着好的房型进行迭代。
1 | val points = spark.textFile(...).map(parsePoint).persist() |
开始我们定义了一个叫做points
的持久化RDD,因为它是从一个文本文件中通过解析每一行到一个Point结构里面映射得到的。然后我们通过对当前w的某个函数值进行加和以重复的对points
进行map和reduce来计算每步的梯度。如6.1节所示,在迭代的时候将points
保存在内存里加速了20倍。
PageRank使用了一种更负载的数据共享模型[6]。该算法迭代式的为每个文档更新rank
值,具体做法是将所有链接到该文档的其他文档贡献度相加。在每次迭代里,每个文档给其邻居r/n
的贡献度,其中r
是该文档的rank
值,n
是它邻居的数量。因此它本次迭代后更新为a/N + (1-a) * sum(ci),其中加和是针对所有输入文档,N是所有文档的总数。于是我们可以在Spark中将PageRank编程为:
1 | // Load graph as an RDD of (URL, outlinks) pairs |
这段代码可以用图3中的RDD谱系图来表示。在每次迭代,我们通过前一次的contributes
和ranks
以及静态的边关系link
数据集,计算出该次迭代的ranks
数据集。该图一个有趣的特征是随着迭代次数的增多它变的越来越长。于是,对于一个有着多次迭代的任务来说,通过备份ranks
某些迭代中间值来减少错误恢复时间是很有必要的。用户可以调用带有RELIABLE
标记参数的persist
函数来干这件事。然而,值得一提的是,links
数据集并不需要备份,因为该数据集的某个分区可以通过输入文件,通过map
操作快速的构建出来。该数据集通常比ranks
大的多(译注:就是说边比点多很多),这是因为每篇文档通常有很多边关系但是只有一个rank
值,因此通过谱系图重建它比在内存中做备份点要省时省力。
最后,我们可以通过控制RDDs的分区来优化计算PageRank过程中的网络传输。如果我们为links
指定分区算法(不知道咋翻译了),那么也为ranks
使用同样的分区算法,以保证links
和ranks
的join
操作无需多机通信(即保证操作所需数据在同一个节点上)。我们也可以构造partitioner
以将互相链接的网页集结在一块(如通过域名来对URL进行散列),两种优化都可以通过在定义links
时调用partitionBy
来实现:
1 | links = spark.textFile(...).map(...) |
通过该初始化的调用,links
和ranks
数据集间的join
运算将会在每个URL链接所存在的机器上进行贡献加和,并在其上计算rank值,和所有links进行join。这种迭代间一致的哈希函是一些专有框架如Pregel的主要优化,RDDs让用户可以直接的获取该目标。
以前对二分查找的认识只停留在有序数组查找给定整数上,后来发现一类问题都可以用二分的思想来做,概括来说就是:如果要求的结果所在的集合(值域)和要搜索的数的集合(定义域)存在单调(映射)关系,就可以通过二分思想来解决,说起来有点抽象,后面将用几个例子来说明。
二分思想以其每次迭代将规模砍一半的效率,有着极其广阔的应用。
本文分两大部分,第一部分对二分查找的各个细节探讨;第二部分拓展二分查找为一般的二分思想。
1 | int find(vector<int> &arr, int target) { |
像上面这一段平平无奇的代码,在实际运用的时候却又诸多变化。二分查找的基本思路不再多说,现在只分析边边角角(Corner Case)。这就涉及到我们一个基本原则,既要让所有元素有可以搜索到的机会,又不至于陷入死循环。具体来说有以下几个问题:
left <= right
,什么时候用left < right
呢?mid
常用的有几种计算方式:mid = (left+right)/2
, mid = left+(right-left)/2
和 mid = (left+right+1)/2
。left
和right
都有两种移动方式,拿left
来说:left=mid+1
和left=mid
。上面的几个问题其实是相互勾连的,视遇到的问题来适当组合。比如说,查找一个有序数组{1, 2, 3, 3, 3, 5}某个数字(3)的左右边界:{2, 4}
1 | int find_range(vector<int> &arr, int target, bool left_range) { |
上面的代码,在要查找的数字存在的时候是对的,如果不存在需要加额外判断。需要加什么判断呢?这也就是需要说明的另一个地方:当我们进行二分查找的时候,是在中途找到结果就退出(return)呢,还是一直到循环条件被打破退出呢?前者用来最简单的查找指定值,而后者一般是查找某个边界,或者符合条件的最值。该问题因为是由于破坏了循环条件left < right
退出的,所以得判断下arr[left]
是否和target
相等。
有些点还没想清楚,以后再补充。
梳理一下MapReduce框架涉及到的一些基本接口和类。
RecordReader
:从输入文件中读入键值对,这里是指的map的输入,还是reduce的输入?接口有三个函数next(Writable key, Writable value)
,getPos()
和close()
,由此看来,该接口类似于一个抽象的迭代器。InputFormat
实现了该接口。
RecordWriter
:将键值对写到输出文件,OutputFormat
实现了该接口。包含函数:write(WritableComparable key, Writable value)
和close(Reporter reporter)
。
OutputCollector
:作为参数传送给Mapper
和Reducer
来输出结果数据。该接口只有一个函数collect(key, val)
。
最近在琢磨关于树的非递归遍历的一些思路和对应的实现,写在这里,聊以备忘。
基本思路是,一路往左走,撞了南墙(碰到NULL)就回头;回头摆到右(子树的根),接着往左走(循环)。
基本代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Definition for a binary tree node.
* struct TreeNode {
* int val;
* TreeNode *left;
* TreeNode *right;
* TreeNode(int x) : val(x), left(NULL), right(NULL) {}
* };
*/
void traversal(TreeNode* root) {
stack<TreeNode*> s;
TreeNode* current = root;
while (!s.empty() || current) {
while (current) {
s.push(current); //(1)
current = current->left;
}
current = s.top()->right; // (3)
s.pop(); // if we pop the node, then we do not have (5)
}
}
对应的完整遍历路径如下:1
2
3
4 (1) root (5?)
/ (3) \
/ \
(2)l-subtree (4)r-subtree
对于某个节点root来说,有三次访问机会:
对应的,按照二叉树遍历的定义,
由于访问完左子树需要继续访问右子树,故需要保存root的指针,即stack在这里的作用。随着访问的深入,stack不断入栈节点,最大长度即为树的深度。这里需要着重说明的是,随着访问左子树归来,将会不断的退栈,因为在获得右子树指针后,已经不需要再保存root的指针了。因此如果不做特殊处理,步骤(5)是没有的。
所以对于二叉树前序遍历和中序遍历的非递归代码,可以直接给出:
1 | // preorder traversal: |
对于后序遍历的代码,会稍微复杂一点,所需要做的改动主要为:不能在第二次访问(即图中(3))后退栈,而需要在阶段(5)退栈。这就牵扯出如何判断阶段(5)的问题。这里我的做法是引入一个prev指针,标记访问序列中前一个二叉树节点,如果root->right即为prev,或者root->right为NULL,就可以判断已经从右子树访问返回,即为阶段(5)。
1 | void postorderTraversal(TreeNode* root) { |
我们知道,可以用栈来模拟函数调用,或者说函数调用本来就是函数栈帧的入栈和退栈。由于二叉树遍历足够简单,也让我们的模拟变的相对容易实现。
比如,对于先序遍历,由于栈会使访问反序,因此先压入右子树。1
2
3
4
5
6
7
8
9
10
11
12void preorderTraversal(TreeNode* root) {
stack<TreeNode*> s;
if (root) s.push(root);
while (!s.empty()) {
TreeNode* now = s.top();
cout << now->val;
s.pop();
if (now->right) s.push(now->right);
if (now->left) s.push(now->left);
}
}
如果先压入左子树呢,就成了逆后序遍历,这样只需要再加一个栈,就能得到后序遍历。1
2
3
4
5
6
7
8
9
10
11
12void inversePostorderTraversal(TreeNode* root) {
stack<TreeNode*> s;
if (root) s.push(root);
while (!s.empty()) {
TreeNode* now = s.top();
cout << now->val;
s.pop();
if (now->left) s.push(now->left);
if (now->right) s.push(now->right);
}
}
以上遍历方法,不论递归还是非递归,其额外的空间复杂度都为O(h)即O(lgn),因为栈开销最大为树的深度。那么有没有一种不借助额外空间的方法来实现树的遍历呢?聪明的你可能会想到线索二叉树,Bingo!这就是Morris中序遍历。
闲话少说,先上代码:
1 | void inorderTraversal(TreeNode* root) { |
基本思路是:1
2
3
4
5
6
7
81. Initialize current as root
2. While current is not NULL
If current does not have left child
a) Print current’s data
b) Go to the right, i.e., current = current->right
Else
a) Make current as right child of the rightmost node in current's left subtree
b) Go to this left child, i.e., current = current->left
好吧,其实这就是伪代码,出处见这里。核心要点就是,当我们一头扎向南墙时,为了能无痛返回,需要在南墙打个洞,通回原路,并且过了洞之后将洞补上。但是打洞是需要时间的,这就是典型的用时间换空间–每次都得在找左孩子最右边那个元素的时候多浪费点时间。
有些点还没想清楚,以后再补充。
在进行模块化的时候,试图将诸如SearchListener
, CallManager
的模块从MainActivity
中拆出来,然而在响应事件的时候,不可避免的需要改变其他资源状态,那么就需要获取其句柄。由此还需要把MainActivity
作为句柄传入代码。
更令人纠结的是,由于存在一些异步事件(如申请权限),其触发在模块(CallManager
)中,其接受在主体视图(MainActivity
)中,又需要将保有的参数重新传回MainActivity
,如此绕来绕去,让我不禁反思我模块化时候存在的问题。
1 | CallManager.java |
在Activity
中接收到call event调用CallManager
,然后在其中请求权限,利用Set方法,将PhoneNumber
传回Activity
。初步想法是可以将请求权限、被授予权限后执行回调函数这种和Activity
紧耦合的代码方法写在MainActivity
中,然后CallManager
只处理call逻辑。但仍需要一个类变量来保存电话号码,以进行从请求权限到执行回调之前的参数传递工作。
输入关键字,实时显示搜索结果。
从官方文档入手,由于初入门,相关术语懂得少,而该文档又非代码级实现,导致没能完整搭起搜索的架子。该文档主要讲了以下几点:
我想使用Search Widget方式,主要遇到以下几个困惑点:
Activity
中触发搜索,就是AppBar右上角的搜索图标如何做出来。Intent
的查询数据,如示例一般,应该不能做到实时匹配输入字符。我猜想应该有listener之类的,但是例子没给。SearchAbleActivity
继承ListView
来实现,但是具体细节,如怎么接受结果,传递给ListView
,都没有提。于是搜索关键词 SearchView action bar,找到一篇帖子:Implementing SearchView in action bar,反复琢磨,才弄清楚了以上几个问题。
首先,对于触发搜索,该回答使用的是具有App Bar的Activity作为SearchableActivity,并且在复写onCreateOptionsMenu函数,实例化其参数menu,并且将SearchView作为其一个item。如此一来,SearchableActivity的右上角就会有搜索按钮。相关代码如下:
res\menu\search.xml:1
2
3
4
5
6
7
8
9<?xml version="1.0" encoding="utf-8"?>
<menu xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto">
<item android:id="@+id/search_menu"
android:title="@string/search_hint"
app:showAsAction="ifRoom|collapseActionView"
app:actionViewClass="android.widget.SearchView" />
</menu>
SearchableActivity.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29@Override
public boolean onCreateOptionsMenu(Menu menu) {
getMenuInflater().inflate(R.menu.search, menu);
this.menu = menu;
// Get the SearchView and set the searchable configuration
SearchManager searchManager = (SearchManager) getSystemService(Context.SEARCH_SERVICE);
SearchView searchView = (SearchView) menu.findItem(R.id.search_menu).getActionView();
// Assumes current activity is the searchable activity
ComponentName name = getComponentName();
searchView.setSearchableInfo(searchManager.getSearchableInfo(name));
searchView.setIconifiedByDefault(false);
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String s) {
doMySearch(s);
return false;
}
@Override
public boolean onQueryTextChange(String s) {
doMySearch(s);
return false;
}
});
return true;
}
AndroidManifest.xml1
2
3
4
5
6
7<activity android:name=".SearchableActivity">
<intent-filter>
<action android:name="android.intent.action.SEARCH" />
</intent-filter>
<meta-data android:name="android.app.searchable"
android:resource="@xml/searchable"/>
</activity>
res/xml/searchable.xml1
2
3
4
5<?xml version="1.0" encoding="utf-8"?>
<searchable xmlns:android="http://schemas.android.com/apk/res/android"
android:label="@string/app_label"
android:hint="@string/app_label" >
</searchable>
其次是实时匹配查询结果;也是在onCreateOptionsMenu
函数中,给SearchView
设置listeners,具体可以见上面代码,但是return true/false暂时有什么区别还没搞清楚。
最后是展示数据;如官方文档所说,利用ListView,具体做法是通过Adapter将数据(比如List
res/layout/item.xml1
2
3
4
5
6
7
8
9<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent">
<TextView
android:id="@+id/item"
android:layout_width="wrap_content"
android:layout_height="wrap_content" />
</RelativeLayout>
ResultAdapter.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class ResultAdapter extends CursorAdapter {
private List<String> items;
private TextView text;
public ResultAdapter(Context context, Cursor cursor, List<String> items) {
super(context, cursor, false);
this.items = items;
}
@Override
public void bindView(View view, Context context, Cursor cursor) {
text.setText(items.get(cursor.getPosition()));
}
@Override
public View newView(Context context, Cursor cursor, ViewGroup parent) {
LayoutInflater inflater = (LayoutInflater) context.getSystemService(Context.LAYOUT_INFLATER_SERVICE);
View view = inflater.inflate(R.layout.item, parent, false);
text = view.findViewById(R.id.item);
return view;
}
}
SearchableActivity.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private void doMySearch(String query){
String[] columns = new String[] { "_id", "text" };
Object[] temp = new Object[] { 0, "default" };
MatrixCursor cursor = new MatrixCursor(columns);
for(int i = 0; i < items.size(); i++) {
temp[0] = i;
temp[1] = items.get(i);
cursor.addRow(temp);
}
// SearchView
final SearchView search = (SearchView) menu.findItem(R.id.search_menu).getActionView();
search.setSuggestionsAdapter(new ResultAdapter(this, cursor, items));
}