(转发)storm 入门原理介绍
1.hadoop有master与slave,Storm与之对应的节点是什么?
2.Storm控制节点上面运行一个后台程序被称之为什么?
3.Supervisor的作用是什么?
4.Topology与Worker之间的关系是什么?
5.Nimbus和Supervisor之间的所有协调工作有master来完成,还是Zookeeper集群完成?
6.storm稳定的原因是什么?
7.如何运行Topology?
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
8.spout是什么?
9.bolt是什么?
10.Topology由两部分组成?
11.stream grouping有几种?
Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。
Storm适用的场景:
1、流数据处理:Storm可以用来用来处理源源不断的消息,并将处理之后的结果保存到持久化介质中。
2、分布式RPC:由于Storm的处理组件都是分布式的,而且处理延迟都极低,所以可以Storm可以做为一个通用的分布式RPC框架来使用。在这个教程里面我们将学习如何创建Topologies, 并且把topologies部署到storm的集群里面去。Java将是我们主要的示范语言, 个别例子会使用python以演示storm的多语言特性。
1、准备工作
这个教程使用storm-starter项目里面的例子。我推荐你们下载这个项目的代码并且跟着教程一起做。先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器设置好。2、一个Storm集群的基本组件
storm的集群表面上看和hadoop的集群非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的 — 一个关键的区别是: 一个MapReduce Job最终会结束, 而一个Topology运永远运行(除非你显式的杀掉他)。在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序:Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。每一个工作节点上面运行一个叫做Supervisor的节点(类似 TaskTracker)。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology(类似 Job)的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程 Worker(类似 Child)组成。

3、Topologies
为了在storm上面做实时计算, 你要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑, 而节点之间的连接则表示数据流动的方向。运行一个Topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令。- strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
-based语言提交的最简单的方法, 看一下文章: 在生产集群上运行topology去看看怎么启动以及停止topologies。
4、Stream
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。spout的流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。5、数据模型(Data Model)
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型, 在我的理解里面一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类 型。你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。123456789101112131415161718192021222324 | publicclassDoubleAndTripleBoltimplementsIRichBolt { privateOutputCollectorBase _collector; @Override publicvoidprepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override publicvoidexecute(Tuple input) { intval = input.getInteger( 0 ); _collector.emit(input,newValues(val* 2 , val* 3 )); _collector.ack(input); } @Override publicvoidcleanup() { } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "double" , "triple" )); } } |
declareOutputFields方法定义要输出的字段 : ["double", "triple"]。这个bolt的其它部分我们接下来会解释。
123456 | TopologyBuilder builder =newTopologyBuilder(); builder.setSpout( 1 ,newTestWordSpout(), 10 ); builder.setBolt( 2 ,newExclamationBolt(), 3 ) .shuffleGrouping( 1 ); builder.setBolt( 3 ,newExclamationBolt(), 2 ) .shuffleGrouping( 2 ); |
这个Topology包含一个Spout和两个Bolt。Spout发射单词, 每个bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。
123 | builder.setBolt( 3 ,newExclamationBolt(), 5 ) .shuffleGrouping( 1 ) .shuffleGrouping( 2 ); |
让我们深入地看一下这个topology里面的spout和bolt是怎么实现的。Spout负责发射新的tuple到这个topology里面来。 TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:
12345678 | publicvoidnextTuple() { Utils.sleep( 100 ); finalString[] words =newString[] { "nathan" , "mike" , "jackson" , "golda" , "bertels" }; finalRandom rand =newRandom(); finalString word = words[rand.nextInt(words.length)]; _collector.emit(newValues(word)); } |
可以看到,实现很简单。
1234567891011121314151617181920 | publicstaticclassExclamationBoltimplementsIRichBolt { OutputCollector _collector; publicvoidprepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } publicvoidexecute(Tuple tuple) { _collector.emit(tuple,newValues(tuple.getString( 0 ) + "!!!" )); _collector.ack(tuple); } publicvoidcleanup() { } publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "word" )); } } |
prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法 使用。execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源). ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。如果一个bolt有多个输入源,你可以通过调用 Tuple#getSourceComponent方法来知道它是来自哪个输入源的。execute方法里面还有其它一些事情值得一提: 输入tuple被作为emit方法的第一个参数,并且输入tuple在最后一行被ack。这些呢都是Storm可靠性API的一部分,后面会解释。cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。 cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。最后,declareOutputFields定义一个叫做”word”的字段的tuple。以local mode运行ExclamationTopology
让我们看看怎么以local mode运行ExclamationToplogy。storm的运行有两种模式: 本地模式和分布式模式. 在本地模式中, storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。 你运行storm-starter里面的topology的时候它们就是以本地模式运行的, 你可以看到topology里面的每一个组件在发射什么消息。在分布式模式下, storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。关于如何在一个集群上面运行topology, 你可以看看Running topologies on a production cluster文章。下面是以本地模式运行ExclamationTopology的代码:
123456789 | Config conf =newConfig(); conf.setDebug( true ); conf.setNumWorkers( 2 ); LocalCluster cluster =newLocalCluster(); cluster.submitTopology( "test" , conf, builder.createTopology()); Utils.sleep( 10000 ); cluster.killTopology( "test" ); cluster.shutdown(); |
首先, 这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集 群是一样的。通过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了, 你必须显式的杀掉一个topology, 否则它会一直运行。Conf对象可以配置很多东西, 下面两个是最常见的:
- TOPOLOGY_WORKERS(setNumWorkers) 定义你希望集群分配多少个工作进程给你来执行这个topology. topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进 程里面. 每一个工作进程包含一些节点的一些工作线程。比如, 如果你指定300个线程,60个进程, 那么每个工作进程里面要执行6个线程, 而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
- TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
感兴趣的话可以去看看Conf对象的Javadoc去看看topology的所有配置。
可以看看创建一个新storm项目去看看怎么配置开发环境以使你能够以本地模式运行topology.运行中的Topology主要由以下三个组件组成的:
Worker processes(进程)
Executors (threads)(线程)
Tasks




7、流分组策略(Stream grouping)
流分组策略告诉topology如何在两个组件之间发送tuple。 要记住, spouts和bolts以很多task的形式在topology里面同步执行。如果从task的粒度来看一个运行的topology, 它应该是这样的:1234567 | TopologyBuilder builder =newTopologyBuilder(); builder.setSpout( 1 ,newRandomSentenceSpout(), 5 ); builder.setBolt( 2 ,newSplitSentence(), 8 ) .shuffleGrouping( 1 ); builder.setBolt( 3 ,newWordCount(), 12 ) .fieldsGrouping( 2 ,newFields( "word" )); |
SplitSentence对于句子里面的每个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。有好几种不同的stream grouping:
- 最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
- 一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task, 那么统计出来的单词次数就不对了。
fields grouping是stream合并,stream聚合以及很多其它场景的基础。在背后呢, fields grouping使用的一致性哈希来分配tuple的。还有一些其它类型的stream grouping. 你可以在Concepts一章里更详细的了解。下面是一些常用的 “路由选择” 机制:Storm的Grouping即消息的Partition机制。当一个Tuple被发送时,如何确定将它发送个某个(些)Task来处理??
l ShuffleGrouping:随机选择一个Task来发送。
l FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
l AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
l GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
l NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
l DirectGrouping:直接将Tuple发送到指定的Task来处理。
8、使用别的语言来定义Bolt
123456789 | publicstaticclassSplitSentenceextendsShellBoltimplementsIRichBolt { publicSplitSentence() { super ( "python" , "splitsentence.py" ); } publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "word" )); } } |
SplitSentence继承自ShellBolt并且声明这个Bolt用python来运行,并且参数是: splitsentence.py。下面是splitsentence.py的定义:
123456789 | importstorm classSplitSentenceBolt(storm.BasicBolt): defprocess(self, tup): words=tup.values[ 0 ].split( " " ) forwordinwords: storm.emit([word]) SplitSentenceBolt().run() |
9、可靠的消息处理
在这个教程的前面,我们跳过了有关tuple的一些特征。这些特征就是storm的可靠性API: storm如何保证spout发出的每一个tuple都被完整处理。看看《storm如何保证消息不丢失》以更深入了解storm的可靠性API.Storm允许用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId可以是任意的Object对象。多 个源Tuple可以共用同一个MessageId,表示这多个源Tuple对用户来说是同一个消息单元。Storm的可靠性是指Storm会告知用户每一 个消息单元是否在一个指定的时间内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过 了Topology中每一个应该到达的Bolt的处理。
原文地址:http://www.aboutyun.com/thread-7394-1-1.html