Storm工程创建

1.创建maven项目:

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>
  <groupId>storm.book</groupId>
  <artifactId>Getting-Started</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
          <compilerVersion>1.6</compilerVersion>
        </configuration>
      </plugin>
     </plugins>
  </build>

  <repositories>

        <!-- Repository where we can found the storm dependencies  -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>

  </repositories>

  <dependencies>

        <!-- Storm Dependency -->
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.7.1</version>
       </dependency>

  </dependencies>

</project>

相关类:

WordReader:

package spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }

    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }

    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }

    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

WordCounter:

package bolts;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

    Integer id;
    String name;
    Map<String, Integer> counters;

    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn‘t exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer:

package bolts;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {

    public void cleanup() {}

    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }

    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

TopologyMain:

import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {

        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));

        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
}

工程架构

运行:

结果

强大的流处理!

github:https://github.com/super-d2/strom_demo

时间: 2024-10-19 23:52:48

Storm工程创建的相关文章

Cocos2d-x游戏开发之lua工程创建

操作系统:OS X 10.85 Cocos2d-x 版本: 2.2.1 使用Cocos2d-x 可以创建lua工程,已经使用cpp创建的工程也可以继承lua进行开发,但是lua并不支持mac工程(因为一些框架的问题). 支持的工程文件如下: 所有使用创建工程create.py language 为cpp的工程,后集成lua及其工具的时候,要注意这一点. 撒 现在进入cocos2d-x 目录之下,通过cd 进入文件目录 进入之后,如果忘记了命令,可以直接运行 create_project.py 如

161130、Dubbo+SpringMVC工程创建详解

Dubbo出现的目的是为了应对现在高并发,高数据量请求的问题.目前的垂直应用架构已经无法满足现在大数据的冲击,SOA就应运而生,而Dubbo在国内使用的还是比较多,稳定性也比较不错. 架构 节点角色说明: Provider: 暴露服务的服务提供方.Consumer: 调用远程服务的服务消费方.Registry: 服务注册与发现的注册中心.Monitor: 统计服务的调用次调和调用时间的监控中心.Container: 服务运行容器. 调用关系说明: 服务容器负责启动,加载,运行服务提供者. 服务提

Android开发之基于AndroidStudio环境搭建和工程创建

断断续续的学习安卓也有一段时间了.因为之前是搞iOS开发的, 之前有关iOS的博客请看<我的iOS开发系列博文>.<我的Objective-C系列文章>和<窥探Swift系列博客说明及其Swift版本间更新>,<设计模式系列文章>,<重构系列文章>,在搞安卓期间好多都是类比着iOS来学的,安卓开发和iOS开发还是有许多相似之处的,控件的使用也都是大同小异,因为之前接触过过JavaEE的东西,所以搞搞安卓还是比较顺利的. 还是由浅入深,本篇博客先简

如何为Eclipse开发工具中创建的JavaWeb工程创建Servlet

在博客<在Eclipse中如何创建JavaWeb工程>中图文并茂的说明了Eclipse中创建JavaWeb工程的方法,本篇博客将告诉大家如何为Eclipse开发工具中创建的JavaWeb工程创建Servlet: 1.在Eclipse开发工具中创建的JavaWeb工程文件目录结构如下图: 说明: a).红框框定的结构用于存放Java类及其相应的包:用于存放Libraries文件夹: b).灰框框定的结构用于显示(非"存放")JavaWeb工程所依赖的JDK相关的jar包: c

Dubbo+SpringMVC工程创建详解(附工程文件)

Dubbo+SpringMVC工程创建详解(附工程文件) Dubbo出现的目的是为了应对现在高并发,高数据量请求的问题.目前的垂直应用架构已经无法满足现在大数据的冲击,SOA就应运而生,而Dubbo在国内使用的还是比较多,稳定性也比较不错. 架构 节点角色说明: Provider: 暴露服务的服务提供方. Consumer: 调用远程服务的服务消费方. Registry: 服务注册与发现的注册中心. Monitor: 统计服务的调用次调和调用时间的监控中心. Container: 服务运行容器.

scrapy工程创建及pycharm运行

1.通过命令行创建scrapy工程项目 scrapy startproject (工程名) scrapy startproject myxml 2.利用爬虫模板设置爬虫文件 在这个过程中我们可以先利用 scrapy genspider -l(小写的L)命令查看当前的爬虫模板 如:scrapy genspider -t 模板名 爬虫文件名 允许的域名 scrapy genspider -t xmlfeed myxmlspider sina.com.cn 注释: scrapy genspider -

Eclipse web工程创建步骤及两种部署方法

1.Eclipse创建web工程步骤 (1)参考1(2)参考2 2.web工程两种部署方法 (1)部署方法同1中所述 (2)部署到Tomcat Server的webapps目录下的方法

关于idea maven工程创建struts2入门配置及案例

1.在maven工程下添加需要导入的jar包节点 <dependencies> <dependency> <groupId>org.apache.struts</groupId> <artifactId>struts2-core</artifactId> <version>2.3.24</version> </dependency> <dependency> <groupId>

macs安卓工程创建

1.创建工程后先修改xml文件. 然后主程序中编写代码. center :如果图片比imageview大则显示中片中心部分 image大小  把view填满 匹配父控件 ,父控件多大它就多大. 包含内容 内容有多大就多大 随着内容动态变化. @代表呼叫引用 此时控件宽度和为0 此时整个父控件的款就是剩余款 此时控件就会等比例分配. 默认fang在左上角. +id表示创建新的id  @id引用 四线三格从上往下数第三个线 相对布局优化效果好,并且改的代码少.