StormAPI简单使用

StormAPI
.note-content {font-family: "Helvetica Neue",Arial,"Hiragino Sans GB","STHeiti","Microsoft YaHei","WenQuanYi Micro Hei",SimSun,Song,sans-serif;}

.note-content h2 {line-height: 1.6; color: #0AA89E;}
.note-content {background: #FFFFFF;}
.note-content h1 {color: #7AB3A7;}
.note-content h3 {color: #147A67;}

StormAPI

○ StormAPI基本概念

Storm称用户的一个作业为

作业=Topology=拓扑

拓扑是由一些点和边组成的有向无关图,点有两种,一种是数据源节点(Spout),另一种是普通计算节点(Bolt),点之间的边称为数据流(Stream),数据流之间的每一条记录称为Tuple

在下图就展示了一个Storm的一个拓扑

每个水龙头就表示一个Spout,它会发送一个Tuple给下游的Bolt,Bolt经过处理之后再把Tuple发送给再下游的Bolt,在这个Bolt里面就可以进行写数据在外部存储的一些操作了,比如说把数据写到数据库里面等等。

这些Spout和Bolt之间的这些边里面,可以设置多种的grouping的方式:

  1. 随机发送
  2. 按一定规则发送

Spout只是单纯的读取数据,SplitBolt对文件的每行切分单词,然后SplitBolt会把它产生的输出交给Storm框架,Storm框架再把数据进行一次Tuple交给CountBolt,CountBolt再把数据写到DATA-SINK里面。本质上和MapReduce的数据处理方式没有什么区别,但Storm是进行流式计算的,所以说数据是源源不断的到来的,输出也是每来一条数据就输出一次。

○ StormAPI的简单使用

TopologyBuilder的公有办法

创建提交拓扑的过程:

  1. 使用new关键字创建一个TopologyBuilder对象
  2. 调用setSpout方法设置Spout
  3. 调用setBolt方法设置Bolt
  4. 调用createTopology方法返回StormTopology对象给submitTopology方法作为输入参数

    WordCountTopology.java

1.package storm.starter;2.3.import backtype.storm.Config;4.import backtype.storm.LocalCluster;5.import backtype.storm.StormSubmitter;6.import backtype.storm.task.ShellBolt;7.import backtype.storm.topology.BasicOutputCollector;8.import backtype.storm.topology.IRichBolt;9.import backtype.storm.topology.OutputFieldsDeclarer;10.import backtype.storm.topology.TopologyBuilder;11.import backtype.storm.topology.base.BaseBasicBolt;12.import backtype.storm.tuple.Fields;13.import backtype.storm.tuple.Tuple;14.import backtype.storm.tuple.Values;15.import storm.starter.spout.RandomSentenceSpout;16.17.import java.util.HashMap;18.import java.util.Map;19.20.//SplitSentence派生出ShellBolt这样一个子类21.public class WordCountTopology {22.  public static class SplitSentence extends ShellBolt implements IRichBolt {23.24.    public SplitSentence() {25.      super("python", "splitsentence.py");26./*      通过标准输入输出来和用户指定的一个脚本来进行交互,使得真正的计算逻辑可以在用户的一个shell脚本里面执行27.      具体的工作流程就是说,我们在ShellBolt里面指定一个命令,比如说python splitsentence.py这样一个命令,然后ShellBolt就会启动这样一个程序,并且把bolt所拿到数据进行一个序列化,通过标准输入这个脚本,把它该输出的东西也进行节分序列化,然后打印到标准输出里面,然后ShellBolt就回去捕获刚打印出来的标准输出   */28.    }29.30.    @Override31.    public void 32. //把输出declare回来并且交给下游33.   declareOutputFields(OutputFieldsDeclarer declarer) {34.      declarer.declare(new Fields("word"));35.    }36.37.    @Override38.    public Map<String, Object> getComponentConfiguration() {39.      return null;40.    }41.  }42.//WordCount派生自BaseBasicBolt这样一个类43.  public static class WordCount extends BaseBasicBolt {44.    Map<String, Integer> counts = new HashMap<String, Integer>();45.46.    @Override47.    //实现execute这样一个方法48.    public void execute(Tuple tuple, BasicOutputCollector collector) {49.    //tuple可以getString拿到一个字段50.      String word = tuple.getString(0);51.      //可以在内存里统计一下,这个"word"出现了多少次52.      Integer count = counts.get(word);53.      if (count == null)54.        count = 0;55.      count++;56.      counts.put(word, count);57.      collector.emit(new Values(word, count));//输出word和截至当前的count是多少58.    }59.60.    @Override61.    public void declareOutputFields(OutputFieldsDeclarer declarer) {62.      declarer.declare(new Fields("word", "count"));63.    }64.  }65.66.  public static void main(String[] args) throws Exception {67.68.//构造一个TopologyBuilder对象69.    TopologyBuilder builder = new TopologyBuilder();70.71.//添加一个id为"spout",并行度为5的RandomSentenceSpout对象72.    builder.setSpout("spout", new RandomSentenceSpout(), 5);73.74.//添加一个id为"split",并行度为8的SplitSentence对象,它的上游"spout",它的分组方式是shuffleGrouping,也就是说我不关心如何分发的,只要随机发下来就可以75.    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");76.77.//添加一个id为"count",并行度为12的WordCount对象,它的上游是"split",它的分组方式是fieldGrouping,也就是说按"word"这么一个字段进行分组78.    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));79.//也就是前面说的将"word"字段相同的组发往同一个bolt进行处理,由同一个bolt进行处理,以保证其正确性80.    Config conf = new Config();81.    conf.setDebug(true);82.83.84.    if (args != null && args.length > 0) {85.      conf.setNumWorkers(3);86.87.      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());88.    }89.    else {90.      conf.setMaxTaskParallelism(3);91.92.//创建一个LocalCluster对象,调用submitTopology,把拓扑含进去93.      LocalCluster cluster = new LocalCluster();94.95.//提交拓扑96.      cluster.submitTopology("word-count", conf, builder.createTopology());97.98.//线程睡眠10s,即拓扑可以运行10s99.      Thread.sleep(10000);100.101.//关闭拓扑102.      cluster.shutdown();103.    }104.  }105.}

一般而言,流式服务是常驻的,不需要关掉的,但是如果判断失败,所以运行10s就关掉。

spout的书写

一个spout的书写就很简单,只要派生个BaseRichSpout这样一个类,实现三个接口:声明有哪些字段、该如何初始化、实现每次读取数据然后把数据输出给下游的逻辑。

RandomSentenceSpout.java

1.package storm.starter.spout;2.3.import backtype.storm.spout.SpoutOutputCollector;4.import backtype.storm.task.TopologyContext;5.import backtype.storm.topology.OutputFieldsDeclarer;6.import backtype.storm.topology.base.BaseRichSpout;7.import backtype.storm.tuple.Fields;8.import backtype.storm.tuple.Values;9.import backtype.storm.utils.Utils;10.11.import java.util.Map;12.import java.util.Random;13.14.//RandomSentenceSpout派生出BaseRichSpout,然后实现了几个方法15.public class RandomSentenceSpout extends BaseRichSpout {16.  SpoutOutputCollector _collector;17.  Random _rand;18.19.20.  @Override21.  //在spout初始化的时候框架会调用open接口进行处理,由于本例SentenceSpout在内存里产生输出的,所以初始化的时候没有做什么特别操作,只是把collector存了起来,生成了Random对象22.  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {23.    _collector = collector;24.    _rand = new Random();25.  }26.27.  @Override28.  //框架会持续不断地调用与时间的nextTuple函数,nextTuple函数里面用户需要去读取数据,然后将读取数据用_collector.emit发送给下游29.  public void nextTuple() {30.    Utils.sleep(100);//睡眠100毫秒31.    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",32.        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };//随机选择一个句子输送给下游33.    String sentence = sentences[_rand.nextInt(sentences.length)];34.    _collector.emit(new Values(sentence));35.  }36.37.  @Override38.  public void ack(Object id) {39.  }40.41.  @Override42.  public void fail(Object id) {43.  }44.45.  @Override46.  //告诉框架我有哪些输出字段,这里就说明了我输出了一个叫"word"的字段47.  public void declareOutputFields(OutputFieldsDeclarer declarer) {48.    declarer.declare(new Fields("word"));49.  }50.51.}

那么我们将脚本放在哪个位置呢?一般而言,都是在你的工程的multilang里面的resources目录下

splitsentence.py

1.import storm #载入storm.py这样一个脚本2.3.class SplitSentenceBolt(storm.BasicBolt):4.    def process(self, tup):#实现一个process的方法,对所有的行和数据进行一个split5.        words = tup.values[0].split(" ")6.        for word in words:7.          storm.emit([word]) #emit给下游8.9.SplitSentenceBolt().run()10.#这样就实现了切分单词的方法11.

storm.py

strom.py的功能很简单,就是完成它的主要功能,读入并且进行json的反序列化,把输出进行序列化,并打到标准输出上面去

1.import sys2.import os3.import traceback4.from collections import deque5.6.try:7.    import simplejson as json8.except ImportError:9.    import json10.11.json_encode = lambda x: json.dumps(x)12.json_decode = lambda x: json.loads(x)13.14.#reads lines and reconstructs newlines appropriately15.def readMsg():16.    msg = ""17.    while True:18.        line = sys.stdin.readline()[0:-1]19.        if line == "end":20.            break21.        msg = msg + line + "\n"22.    return json_decode(msg[0:-1])23.24.MODE = None25.ANCHOR_TUPLE = None26.27.#queue up commands we read while trying to read taskids28.pending_commands = deque()29.30.def readTaskIds():31.    if pending_taskids:32.        return pending_taskids.popleft()33.    else:34.        msg = readMsg()35.        while type(msg) is not list:36.            pending_commands.append(msg)37.            msg = readMsg()38.        return msg39.40.#queue up taskids we read while trying to read commands/tuples41.pending_taskids = deque()42.43.def readCommand():44.    if pending_commands:45.        return pending_commands.popleft()46.    else:47.        msg = readMsg()48.        while type(msg) is list:49.            pending_taskids.append(msg)50.            msg = readMsg()51.        return msg52.53.def readTuple():54.    cmd = readCommand()55.    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])56.57.def sendMsgToParent(msg):58.    print json_encode(msg)59.    print "end"60.    sys.stdout.flush()61.62.def sync():63.    sendMsgToParent({‘command‘:‘sync‘})64.65.def sendpid(heartbeatdir):66.    pid = os.getpid()67.    sendMsgToParent({‘pid‘:pid})68.    open(heartbeatdir + "/" + str(pid), "w").close()    69.70.def emit(*args, **kwargs):71.    __emit(*args, **kwargs)72.    return readTaskIds()73.74.def emitDirect(task, *args, **kwargs):75.    kwargs[directTask] = task76.    __emit(*args, **kwargs)77.78.def __emit(*args, **kwargs):79.    global MODE80.    if MODE == Bolt:81.        emitBolt(*args, **kwargs)82.    elif MODE == Spout:83.        emitSpout(*args, **kwargs)84.85.def emitBolt(tup, stream=None, anchors = [], directTask=None):86.    global ANCHOR_TUPLE87.    if ANCHOR_TUPLE is not None:88.        anchors = [ANCHOR_TUPLE]89.    m = {"command": "emit"}90.    if stream is not None:91.        m["stream"] = stream92.    m["anchors"] = map(lambda a: a.id, anchors)93.    if directTask is not None:94.        m["task"] = directTask95.    m["tuple"] = tup96.    sendMsgToParent(m)97.98.def emitSpout(tup, stream=None, id=None, directTask=None):99.    m = {"command": "emit"}100.    if id is not None:101.        m["id"] = id102.    if stream is not None:103.        m["stream"] = stream104.    if directTask is not None:105.        m["task"] = directTask106.    m["tuple"] = tup107.    sendMsgToParent(m)108.109.def ack(tup):110.    sendMsgToParent({"command": "ack", "id": tup.id})111.112.def fail(tup):113.    sendMsgToParent({"command": "fail", "id": tup.id})114.115.def log(msg):116.    sendMsgToParent({"command": "log", "msg": msg})117.118.def initComponent():119.    setupInfo = readMsg()120.    sendpid(setupInfo[‘pidDir‘])121.    return [setupInfo[‘conf‘], setupInfo[‘context‘]]122.123.class Tuple:124.    def __init__(self, id, component, stream, task, values):125.        self.id = id126.        self.component = component127.        self.stream = stream128.        self.task = task129.        self.values = values130.131.    def __repr__(self):132.        return ‘<%s%s>‘ % (133.                self.__class__.__name__,134.                ‘‘.join(‘ %s=%r‘ % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))135.136.class Bolt:137.    def initialize(self, stormconf, context):138.        pass139.140.    def process(self, tuple):141.        pass142.143.    def run(self):144.        global MODE145.        MODE = Bolt146.        conf, context = initComponent()147.        self.initialize(conf, context)148.        try:149.            while True:150.                tup = readTuple()151.                self.process(tup)152.        except Exception, e:153.            log(traceback.format_exc(e))154.155.class BasicBolt:156.    def initialize(self, stormconf, context):157.        pass158.159.    def process(self, tuple):160.        pass161.162.    def run(self):163.        global MODE164.        MODE = Bolt165.        global ANCHOR_TUPLE166.        conf, context = initComponent()167.        self.initialize(conf, context)168.        try:169.            while True:170.                tup = readTuple()171.                ANCHOR_TUPLE = tup172.                self.process(tup)173.                ack(tup)174.        except Exception, e:175.            log(traceback.format_exc(e))176.177.class Spout:178.    def initialize(self, conf, context):179.        pass180.181.    def ack(self, id):182.        pass183.184.    def fail(self, id):185.        pass186.187.    def nextTuple(self):188.        pass189.190.    def run(self):191.        global MODE192.        MODE = Spout193.        conf, context = initComponent()194.        self.initialize(conf, context)195.        try:196.            while True:197.                msg = readCommand()198.                if msg["command"] == "next":199.                    self.nextTuple()200.                if msg["command"] == "ack":201.                    self.ack(msg["id"])202.                if msg["command"] == "fail":203.                    self.fail(msg["id"])204.                sync()205.        except Exception, e:206.            log(traceback.format_exc(e))
时间: 2024-08-07 14:01:09

StormAPI简单使用的相关文章

C# Ping 简单使用

编程过程中,有时候需要判断主机是否在线,最简单的方法就是使用Windows的Ping命令看看能否ping通.看到网上很多文章,说用C#去调用windows的ping.exe,然后解析返回的字符串.我觉得这种方式太麻烦了,就做一下简单判断,不想弄那么麻烦. 查了一下,C#专门提供了一个Ping类,与Windows下的ping命令类似: 命令空间: System.Net.NetworkInformation; 使用方法: bool online = false; //是否在线 Ping ping =

自动生成简单四则运算的C语言程序

该程序是在博客园里面找的,具体是谁的找了半天没找到,无法提供它原本的链接.由于自己写的过于简单,且有一些功能暂时无法实现,所以就找了一个来应付作业,望原谅.在这个程序的源码中我改了一个错误的地方,源码中有这样一个随机数发生器的初始化函数的语句:"srand((unsigned)time(NULL))".srand函数是随机数发生器的初始化函数.但是正确的写法应该是:srand(unsigned( time(NULL))):为了防止随机数每次重复,常常使用系统时间来初始化,即使用time

Mysql的锁机制与PHP文件锁处理高并发简单思路

以购买商品举例: ① 从数据库获取库存的数量. ② 检查一下库存的数量是否充足. ③ 库存的数量减去买家购买的数量(以每个用户购买一个为例). ④ 最后完成购买. 仅仅这几行逻辑代码在并发的情况下会出现问题,自己可以想象一下. 这里暂时就不测试了,下面会针对并发的处理给出测试结果. 创建表: CREATE TABLE `warehouse` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `stock` int(11) NOT NULL

Winfrom 简单的安卓手机屏幕获取和安卓简单操作

为啥我要做这个东西了,是因为经常要用投影演示app ,现在有很多这样的软件可以把手机界面投到电脑上 ,但都要安装,比如说360的手机助手,我又讨厌安装,于是就自己捣鼓了下 做了这个东西, 实现了以下简单功能   1.屏幕获取(因为是截图方式获取的,所以有点卡顿) 2.实现点击功能,并在点击的时候出现一个手势图标,方便用户观看 3.实现简单的滑动功能 4.实现在界面上画图功能 5.实现拖拽安装apk功能 操作说明:鼠标左边 模拟手机点击,中键停止/开始刷新界面(画图的时候不能刷新),右键去掉画图内

iOS instruments之ui automation的简单使用(高手绕道)

最近使用了几次instruments中的automation工具,现记录下automation的简单使用方法,希望对没接触过自动化测试又有需求的人有所帮助.  UI 自动测试是iOS 中重要的附加功能,它由名为"Automation"的新的工具对象支持.Automation工具的脚本是用JavaScript语言编写,主要用于分析应用的性能和用户行为,模仿/击发被请求的事件,利用它可以完成对被测应用的简单的UI测试及相关功能测试. 一. 简单的录制脚本 打开xcode,这里用我为我家亲爱

Android ExpandableListView 带有Checkbox的简单应用

expandablelistview2_groups.xml <?xml version="1.0" encoding="utf-8"?> <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="match_parent" android:layout_height=&qu

Android ExpandableListView的简单应用

Expandablelistview1Activity.java package com.wangzhu.demoexpandablelistview; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import android.app.Activity; import android.os.Bundle; import android.widg

一个简单的主机管理模拟程序

最近写的一个小练习,主要是把前面学的东西整合一下.写了一个简单的主机管理界面,主要是练习以下知识点: Session和Cookie进行登录验证(装饰器) 数据库的基本操作 (单表,1对多,多对多) Form的简单使用实现验证 Bootstrap模板写个简单界面 自定义分页 信号,中间件,CSRF,模板语言,JavaScript,AJAX等等 界面比较low,毕竟不是专业的. 附件里面是Django的源代码,3个文件放在一起winrar解压就可以打开

简单介绍一下vue2.0

Vue Vue是用于构建用户界面的渐进框架.作者尤雨熙特别强调它与其他的框架不同,Vue是渐进式的框架,可以逐步采用,不必一下就通过框架去重构项目. 另外Vue的核心库只专注于视图层,这样就更容易与其他库或现有项目进行集成,也更灵活. Vue在兼容性上不支持IE8以下版本的浏览器,用到了ECMAScript 5的功能,所有支持ECMAScript 5的浏览器都没问题,像这些: 安装 如果你已经熟悉并安装webpack那可以直接装一个CLI版即命令行工具,快速方便. $ npm install -