StormAPI
.note-content {font-family: "Helvetica Neue",Arial,"Hiragino Sans GB","STHeiti","Microsoft YaHei","WenQuanYi Micro Hei",SimSun,Song,sans-serif;}
.note-content h2 {line-height: 1.6; color: #0AA89E;}
.note-content {background: #FFFFFF;}
.note-content h1 {color: #7AB3A7;}
.note-content h3 {color: #147A67;}
StormAPI
○ StormAPI基本概念
Storm称用户的一个作业为
作业=Topology=拓扑
拓扑是由一些点和边组成的有向无关图,点有两种,一种是数据源节点(Spout),另一种是普通计算节点(Bolt),点之间的边称为数据流(Stream),数据流之间的每一条记录称为Tuple
在下图就展示了一个Storm的一个拓扑
每个水龙头就表示一个Spout,它会发送一个Tuple给下游的Bolt,Bolt经过处理之后再把Tuple发送给再下游的Bolt,在这个Bolt里面就可以进行写数据在外部存储的一些操作了,比如说把数据写到数据库里面等等。
这些Spout和Bolt之间的这些边里面,可以设置多种的grouping的方式:
- 随机发送
- 按一定规则发送
Spout只是单纯的读取数据,SplitBolt对文件的每行切分单词,然后SplitBolt会把它产生的输出交给Storm框架,Storm框架再把数据进行一次Tuple交给CountBolt,CountBolt再把数据写到DATA-SINK里面。本质上和MapReduce的数据处理方式没有什么区别,但Storm是进行流式计算的,所以说数据是源源不断的到来的,输出也是每来一条数据就输出一次。
○ StormAPI的简单使用
TopologyBuilder的公有办法
创建提交拓扑的过程:
- 使用new关键字创建一个TopologyBuilder对象
- 调用setSpout方法设置Spout
- 调用setBolt方法设置Bolt
- 调用createTopology方法返回StormTopology对象给submitTopology方法作为输入参数
1.package storm.starter;2.3.import backtype.storm.Config;4.import backtype.storm.LocalCluster;5.import backtype.storm.StormSubmitter;6.import backtype.storm.task.ShellBolt;7.import backtype.storm.topology.BasicOutputCollector;8.import backtype.storm.topology.IRichBolt;9.import backtype.storm.topology.OutputFieldsDeclarer;10.import backtype.storm.topology.TopologyBuilder;11.import backtype.storm.topology.base.BaseBasicBolt;12.import backtype.storm.tuple.Fields;13.import backtype.storm.tuple.Tuple;14.import backtype.storm.tuple.Values;15.import storm.starter.spout.RandomSentenceSpout;16.17.import java.util.HashMap;18.import java.util.Map;19.20.//SplitSentence派生出ShellBolt这样一个子类21.public class WordCountTopology {22. public static class SplitSentence extends ShellBolt implements IRichBolt {23.24. public SplitSentence() {25. super("python", "splitsentence.py");26./* 通过标准输入输出来和用户指定的一个脚本来进行交互,使得真正的计算逻辑可以在用户的一个shell脚本里面执行27. 具体的工作流程就是说,我们在ShellBolt里面指定一个命令,比如说python splitsentence.py这样一个命令,然后ShellBolt就会启动这样一个程序,并且把bolt所拿到数据进行一个序列化,通过标准输入这个脚本,把它该输出的东西也进行节分序列化,然后打印到标准输出里面,然后ShellBolt就回去捕获刚打印出来的标准输出 */28. }29.30. @Override31. public void 32. //把输出declare回来并且交给下游33. declareOutputFields(OutputFieldsDeclarer declarer) {34. declarer.declare(new Fields("word"));35. }36.37. @Override38. public Map<String, Object> getComponentConfiguration() {39. return null;40. }41. }42.//WordCount派生自BaseBasicBolt这样一个类43. public static class WordCount extends BaseBasicBolt {44. Map<String, Integer> counts = new HashMap<String, Integer>();45.46. @Override47. //实现execute这样一个方法48. public void execute(Tuple tuple, BasicOutputCollector collector) {49. //tuple可以getString拿到一个字段50. String word = tuple.getString(0);51. //可以在内存里统计一下,这个"word"出现了多少次52. Integer count = counts.get(word);53. if (count == null)54. count = 0;55. count++;56. counts.put(word, count);57. collector.emit(new Values(word, count));//输出word和截至当前的count是多少58. }59.60. @Override61. public void declareOutputFields(OutputFieldsDeclarer declarer) {62. declarer.declare(new Fields("word", "count"));63. }64. }65.66. public static void main(String[] args) throws Exception {67.68.//构造一个TopologyBuilder对象69. TopologyBuilder builder = new TopologyBuilder();70.71.//添加一个id为"spout",并行度为5的RandomSentenceSpout对象72. builder.setSpout("spout", new RandomSentenceSpout(), 5);73.74.//添加一个id为"split",并行度为8的SplitSentence对象,它的上游"spout",它的分组方式是shuffleGrouping,也就是说我不关心如何分发的,只要随机发下来就可以75. builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");76.77.//添加一个id为"count",并行度为12的WordCount对象,它的上游是"split",它的分组方式是fieldGrouping,也就是说按"word"这么一个字段进行分组78. builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));79.//也就是前面说的将"word"字段相同的组发往同一个bolt进行处理,由同一个bolt进行处理,以保证其正确性80. Config conf = new Config();81. conf.setDebug(true);82.83.84. if (args != null && args.length > 0) {85. conf.setNumWorkers(3);86.87. StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());88. }89. else {90. conf.setMaxTaskParallelism(3);91.92.//创建一个LocalCluster对象,调用submitTopology,把拓扑含进去93. LocalCluster cluster = new LocalCluster();94.95.//提交拓扑96. cluster.submitTopology("word-count", conf, builder.createTopology());97.98.//线程睡眠10s,即拓扑可以运行10s99. Thread.sleep(10000);100.101.//关闭拓扑102. cluster.shutdown();103. }104. }105.}
一般而言,流式服务是常驻的,不需要关掉的,但是如果判断失败,所以运行10s就关掉。
spout的书写
一个spout的书写就很简单,只要派生个BaseRichSpout这样一个类,实现三个接口:声明有哪些字段、该如何初始化、实现每次读取数据然后把数据输出给下游的逻辑。
1.package storm.starter.spout;2.3.import backtype.storm.spout.SpoutOutputCollector;4.import backtype.storm.task.TopologyContext;5.import backtype.storm.topology.OutputFieldsDeclarer;6.import backtype.storm.topology.base.BaseRichSpout;7.import backtype.storm.tuple.Fields;8.import backtype.storm.tuple.Values;9.import backtype.storm.utils.Utils;10.11.import java.util.Map;12.import java.util.Random;13.14.//RandomSentenceSpout派生出BaseRichSpout,然后实现了几个方法15.public class RandomSentenceSpout extends BaseRichSpout {16. SpoutOutputCollector _collector;17. Random _rand;18.19.20. @Override21. //在spout初始化的时候框架会调用open接口进行处理,由于本例SentenceSpout在内存里产生输出的,所以初始化的时候没有做什么特别操作,只是把collector存了起来,生成了Random对象22. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {23. _collector = collector;24. _rand = new Random();25. }26.27. @Override28. //框架会持续不断地调用与时间的nextTuple函数,nextTuple函数里面用户需要去读取数据,然后将读取数据用_collector.emit发送给下游29. public void nextTuple() {30. Utils.sleep(100);//睡眠100毫秒31. String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",32. "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };//随机选择一个句子输送给下游33. String sentence = sentences[_rand.nextInt(sentences.length)];34. _collector.emit(new Values(sentence));35. }36.37. @Override38. public void ack(Object id) {39. }40.41. @Override42. public void fail(Object id) {43. }44.45. @Override46. //告诉框架我有哪些输出字段,这里就说明了我输出了一个叫"word"的字段47. public void declareOutputFields(OutputFieldsDeclarer declarer) {48. declarer.declare(new Fields("word"));49. }50.51.}
那么我们将脚本放在哪个位置呢?一般而言,都是在你的工程的multilang里面的resources目录下
1.import storm #载入storm.py这样一个脚本2.3.class SplitSentenceBolt(storm.BasicBolt):4. def process(self, tup):#实现一个process的方法,对所有的行和数据进行一个split5. words = tup.values[0].split(" ")6. for word in words:7. storm.emit([word]) #emit给下游8.9.SplitSentenceBolt().run()10.#这样就实现了切分单词的方法11.
strom.py的功能很简单,就是完成它的主要功能,读入并且进行json的反序列化,把输出进行序列化,并打到标准输出上面去
1.import sys2.import os3.import traceback4.from collections import deque5.6.try:7. import simplejson as json8.except ImportError:9. import json10.11.json_encode = lambda x: json.dumps(x)12.json_decode = lambda x: json.loads(x)13.14.#reads lines and reconstructs newlines appropriately15.def readMsg():16. msg = ""17. while True:18. line = sys.stdin.readline()[0:-1]19. if line == "end":20. break21. msg = msg + line + "\n"22. return json_decode(msg[0:-1])23.24.MODE = None25.ANCHOR_TUPLE = None26.27.#queue up commands we read while trying to read taskids28.pending_commands = deque()29.30.def readTaskIds():31. if pending_taskids:32. return pending_taskids.popleft()33. else:34. msg = readMsg()35. while type(msg) is not list:36. pending_commands.append(msg)37. msg = readMsg()38. return msg39.40.#queue up taskids we read while trying to read commands/tuples41.pending_taskids = deque()42.43.def readCommand():44. if pending_commands:45. return pending_commands.popleft()46. else:47. msg = readMsg()48. while type(msg) is list:49. pending_taskids.append(msg)50. msg = readMsg()51. return msg52.53.def readTuple():54. cmd = readCommand()55. return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])56.57.def sendMsgToParent(msg):58. print json_encode(msg)59. print "end"60. sys.stdout.flush()61.62.def sync():63. sendMsgToParent({‘command‘:‘sync‘})64.65.def sendpid(heartbeatdir):66. pid = os.getpid()67. sendMsgToParent({‘pid‘:pid})68. open(heartbeatdir + "/" + str(pid), "w").close() 69.70.def emit(*args, **kwargs):71. __emit(*args, **kwargs)72. return readTaskIds()73.74.def emitDirect(task, *args, **kwargs):75. kwargs[directTask] = task76. __emit(*args, **kwargs)77.78.def __emit(*args, **kwargs):79. global MODE80. if MODE == Bolt:81. emitBolt(*args, **kwargs)82. elif MODE == Spout:83. emitSpout(*args, **kwargs)84.85.def emitBolt(tup, stream=None, anchors = [], directTask=None):86. global ANCHOR_TUPLE87. if ANCHOR_TUPLE is not None:88. anchors = [ANCHOR_TUPLE]89. m = {"command": "emit"}90. if stream is not None:91. m["stream"] = stream92. m["anchors"] = map(lambda a: a.id, anchors)93. if directTask is not None:94. m["task"] = directTask95. m["tuple"] = tup96. sendMsgToParent(m)97.98.def emitSpout(tup, stream=None, id=None, directTask=None):99. m = {"command": "emit"}100. if id is not None:101. m["id"] = id102. if stream is not None:103. m["stream"] = stream104. if directTask is not None:105. m["task"] = directTask106. m["tuple"] = tup107. sendMsgToParent(m)108.109.def ack(tup):110. sendMsgToParent({"command": "ack", "id": tup.id})111.112.def fail(tup):113. sendMsgToParent({"command": "fail", "id": tup.id})114.115.def log(msg):116. sendMsgToParent({"command": "log", "msg": msg})117.118.def initComponent():119. setupInfo = readMsg()120. sendpid(setupInfo[‘pidDir‘])121. return [setupInfo[‘conf‘], setupInfo[‘context‘]]122.123.class Tuple:124. def __init__(self, id, component, stream, task, values):125. self.id = id126. self.component = component127. self.stream = stream128. self.task = task129. self.values = values130.131. def __repr__(self):132. return ‘<%s%s>‘ % (133. self.__class__.__name__,134. ‘‘.join(‘ %s=%r‘ % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))135.136.class Bolt:137. def initialize(self, stormconf, context):138. pass139.140. def process(self, tuple):141. pass142.143. def run(self):144. global MODE145. MODE = Bolt146. conf, context = initComponent()147. self.initialize(conf, context)148. try:149. while True:150. tup = readTuple()151. self.process(tup)152. except Exception, e:153. log(traceback.format_exc(e))154.155.class BasicBolt:156. def initialize(self, stormconf, context):157. pass158.159. def process(self, tuple):160. pass161.162. def run(self):163. global MODE164. MODE = Bolt165. global ANCHOR_TUPLE166. conf, context = initComponent()167. self.initialize(conf, context)168. try:169. while True:170. tup = readTuple()171. ANCHOR_TUPLE = tup172. self.process(tup)173. ack(tup)174. except Exception, e:175. log(traceback.format_exc(e))176.177.class Spout:178. def initialize(self, conf, context):179. pass180.181. def ack(self, id):182. pass183.184. def fail(self, id):185. pass186.187. def nextTuple(self):188. pass189.190. def run(self):191. global MODE192. MODE = Spout193. conf, context = initComponent()194. self.initialize(conf, context)195. try:196. while True:197. msg = readCommand()198. if msg["command"] == "next":199. self.nextTuple()200. if msg["command"] == "ack":201. self.ack(msg["id"])202. if msg["command"] == "fail":203. self.fail(msg["id"])204. sync()205. except Exception, e:206. log(traceback.format_exc(e))