再探Apache Storm(2)---Topology拓扑

编程模型

在各个大数据平台中,对于用户提交的任务都有着不同的名称,如Hadoop中叫MapReduce,而Storm中则叫拓扑(Topology)。

正如其名称一样,每一个Topology对应的是一个有向无环图(DAG)。这个DAG图上有两种节点:spout和bolt节点,spout节点负责数据的流入工作,可以有多种spout,从不同的数据源获取数据然后发送给下游的bolt节点处理;bolt节点负责数据的具体执行工作,bolt可以有很多种、很多层,即一种bolt处理数据之后能交给下游的bolt接着处理。DAG图上两个节点之间的线则可以称为管道,管道中流动着不同的数据流(stream),构成这些stream的基本单位是元组(tuple),根据不同的设置,这些元组会流进不同的下游bolt中。

高速运行的流水线

我们可以很形象的把这个Topology比喻成工厂里的一条流水线,spout负责从仓库(数据源)里取出原产品,然后放到管道里,流水线下游的bolt们则一直在监视着管道的动静,一等到中间产品(tuple)到来,就赶紧拿起来按照图纸吭哧吭哧的处理好,然后再根据要求放到下游的管道里,其他bolt也是如此,一直到最后一道的bolt,这样一个产品(tuple)就处理完毕了。

一般来说,每个工人最高效的工作情况是简单重复性劳动,也就是说每一道工序要尽可能的简单,但是也不能过于简单,毕竟工人从管道里拿出/放入产品和管道流动还需要额外的时间。

spout 和 bolt 们都兢兢业业,他们一站上岗位就会开始工作,直到监工喊停(Topology被杀死)才会离开流水线。

apache storm topology

并行度设置

正如工厂里的流水线一样,每一个环节的处理时间很有可能是不同的,为了避免这种差异带来的“忙的忙死、闲的闲死”问题,每一道工序都配备了不同数目的spout或bolt(并发度),如果一个图纸设计师(Topology设计)足够优秀的话,这条流水线上的工人们就能达到相似的处理速度,实现一种高利用率。

如何来安排各个工序上改有多少人呢?尽管能够在程序运行时修改并行度,但是想让流水线处于一个较高的利用率,并行度需要仔细设置,这靠图纸设计师的经验并综合运行时情况,估算会有多少数据流入,以及每道工序大概需要多少时间。

如果某一个工序处理时间实在是太慢,产品堆积过多如何缓解?人算不如天算,再好的估算也没法保证实际运行时会按照这个来,很有可能会出现“爆仓”的情况,就像双十一剁手的人那么多,网点不可避免的会“爆仓”,但是又不可能一直安排过量的工人。针对这种情况,storm提供了一个机制,即每次spout从仓库拿出一个原材料的时候,就在一个公共的计数器上加一,等到这个产品被最后一个bolt处理完的时候,再从计数器上减一,当计数器上的处理中产品(tuple)达到一个阈值的时候,spout就停下来,等那些慢动作的bolt们把积压的包裹处理完。这种机制保证了不会出现爆仓的情况,但是可以看出,依旧出现了等待的情况,而且这引入了额外的工作,降低了流水线的效率。所以,最好的情况下还是考虑均衡各个工序的工人数目。

复杂的管道

不是每一家的产品处理都是简单的一个一个依次处理的,虽然每个工序的处理是一样的,但是由于原材料的差异中间产品也会不同,比如有的bolt对付某种情况是专家,那么应该把所有的这种产品都交给他处理,或者是其他的复杂情况,所以每个工人面前有多个管道,每个管道里流着不同的产品(stream),它们处理完毕之后要根据图纸再决定放进哪条管道里,同理,下游的工人们也会根据不同的设计去处理分配给自己的产品。

每个工人干的事情

为了尽可能快速的培养流水线工人,以及能在各个工序上迁移,尽可能按照统一的培训方式来培训每一个工人。

每一个工人都需要学会这么几项技能:

open/prepare

1
2
3
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
/* do init work*/
}

在走上流水线之后,要干的事情,如打开工具箱等等(初始化工作)

nextTuple/execute

1
2
3
public void nextTuple() {
/* handle a tuple */
}

当管道里吐出一个产品的时候,该如何去处理

declareOutputFields

1
2
3
public void declareOutputFields(OutputFieldsDeclarer declarer) {
/* */
}

告诉他处理完之后应该把哪些东西放到哪些管道里。

ack/fail

1
2
3
4
5
6
7
public void ack(Object msgId) {
/* */
}

public void ack(Object msgId) {
/* */
}

人非圣贤,孰能无过,在工作的时候难免会遇到失手搞砸的情况,有的订单(Topology)要求松一点,做坏了也就算了,但是有的订单要求很严,每一个原材料都必须产生最终的产品,这种情况下,这个工人就需要向上面汇报,让spout重新找一个一摸一样的原材料出来,或者是告诉spout这个我们搞定了,可以划掉了。

cleanup

下班的时候要做的扫尾工作,比如写一下工作总结、把开关关掉等等。(不过有的老板很黑心的,从来不喊停,所以收尾工作不一定能执行到)

很能装的管道

正如前面所说的,不同工序存在速度不匹配的情况,需要一种很强大的管道来保证技能高效的流动,同时也能允许很大的积压情况,在Storm这家工厂,他们采用了ZeroMQ,这种管道吞吐量很厉害,当积压过大的时候会自动放到仓库里,需要的时候再拿出来,而且还还装了很多监控,保证产品不会丢失。后来Storm老板想要打造开放型工厂,由于ZeroMQ存在一些版权问题以及性能问题,由把管道换成了Netty。

总结

本文以流水线的形式描述了一个Storm Topology的定义与执行情况。

参考

Apache Storm

Storm工作原理(1)

Getting Started with Storm