Storm手写WordCount

建立一个maven项目,在pom.xml中进行如下配置:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.darrenchan</groupId>
    <artifactId>StormDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>StormDemo</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.5</version>
            <!--<scope>provided</scope> -->
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.itcast.bigdata.hadoop.mapreduce.wordcount.WordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

项目目录为:

MySpout.java:

package cn.darrenchan.storm;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MySpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    //storm框架不停地调用
    @Override
    public void nextTuple() {
        collector.emit(new Values("i am lilei love hanmeimei"));
    }

    //初始化方法
    @Override
    public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //声明本spout组件发送出去的tuple中的数据的字段名
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("love"));
    }

}

MySplitBolt.java:

package cn.darrenchan.storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MySplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    //storm框架不停地调用,传入参数是tutle
    @Override
    public void execute(Tuple input) {
        String line = input.getString(0);
        String[] words = line.split(" ");
        for (String word : words) {
            collector.emit(new Values(word, 1));
        }
    }

    //初始化方法
    @Override
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "num"));
    }

}

MyCountBolt.java:

package cn.darrenchan.storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class MyCountBolt extends BaseRichBolt {

    private OutputCollector collector;
    private Map<String, Integer> map;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        map = new HashMap<String, Integer>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer num = input.getInteger(1);
        if(map.containsKey(word)){
            map.put(word, map.get(word) + num);
        } else {
            map.put(word, 1);
        }

        System.out.println(map);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

WordCountTopoloyMain.java:

package cn.darrenchan.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopoloyMain {
    public static void main(String[] args) throws Exception {
        //1.准备一个TopologyBuilder
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("mySpout", new MySpout(), 1);
        builder.setBolt("mySplitBolt", new MySplitBolt(), 2).shuffleGrouping("mySpout");
        builder.setBolt("myCountBolt", new MyCountBolt(), 2).fieldsGrouping("mySplitBolt", new Fields("word"));

        //2.创建一个configuration,用来指定当前的topology需要的worker的数量
        Config config = new Config();
        config.setNumWorkers(4);

        //3.任务提交 两种模式————本地模式和集群模式
        //集群模式
        //StormSubmitter.submitTopology("myWordCount", config, builder.createTopology());
        //本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("myWordCount", config, builder.createTopology());
    }
}
时间: 2024-10-12 22:38:30

Storm手写WordCount的相关文章

Hive手写SQL案例

1-请详细描述将一个有结构的文本文件student.txt导入到一个hive表中的步骤,及其关键字 假设student.txt 有以下几列:id,name,gender三列 1-创建数据库 create database student_info; 2-创建hive表 student create external table student_info.student( id string comment '学生id', name string comment '学生姓名', gender st

wex5 实战 手写签名与上传

之前做过一个物流演示模块,有一个功能没做完,就是收件人收货后,可以手写签名并上传,替代传统纸张的签名.今天终于做完了. 一 效果演示: 后台图片上传成功 二 设计思路: 运用canvas插件jq-signature,制作手写签名,并通过二进制流方式上传图片 三 代码实现: 1 .下载引入插件jq-signature 手写签名插件,网上有很多,经过多种插件的逐步尝试,只有插件jq-signature达到了我想要的效果; 原因有二:1,支持手机触摸,web,鼠标.其它有的不支持手机触摸. 2,直接转

iOS开发UI基础—手写控件,frame,center和bounds属性

一.手写控件 1.手写控件的步骤 (1)使用相应的控件类创建控件对象 (2)设置该控件的各种属性 (3)添加控件到视图中 (4)如果是button等控件,还需考虑控件的单击事件等 (5)注意:View Contollor和view的关系 2.注意点 在OC开发中,Storyboard中的所有操作都可以通过代码实现,程序员一定要熟练掌握代码布局界面的能力! 设置控件监听方法的示例代码如下: [btn addTarget:self action:@selector(click:) forContro

logistic回归与手写识别例子的实现

本文主要介绍logistic回归相关知识点和一个手写识别的例子实现 一.logistic回归介绍: logistic回归算法很简单,这里简单介绍一下: 1.和线性回归做一个简单的对比 下图就是一个简单的线性回归实例,简单一点就是一个线性方程表示 (就是用来描述自变量和因变量已经偏差的方程) 2.logistic回归 可以看到下图,很难找到一条线性方程能将他们很好的分开.这里也需要用到logistic回归来处理了. logistic回归本质上是线性回归,只是在特征到结果的映射中加入了一层函数映射,

KNN分类算法实现手写数字识别

需求: 利用一个手写数字"先验数据"集,使用knn算法来实现对手写数字的自动识别: 先验数据(训练数据)集: ?数据维度比较大,样本数比较多. ? 数据集包括数字0-9的手写体. ?每个数字大约有200个样本. ?每个样本保持在一个txt文件中. ?手写体图像本身的大小是32x32的二值图,转换到txt文件保存后,内容也是32x32个数字,0或者1,如下: 数据集压缩包解压后有两个目录:(将这两个目录文件夹拷贝的项目路径下E:/KNNCase/digits/) ?目录trainingD

使用Caffe进行手写数字识别执行流程解析

之前在 http://blog.csdn.net/fengbingchun/article/details/50987185 中仿照Caffe中的examples实现对手写数字进行识别,这里详细介绍下其执行流程并精简了实现代码,使用Caffe对MNIST数据集进行train的文章可以参考  http://blog.csdn.net/fengbingchun/article/details/68065338 : 1.   先注册所有层,执行layer_factory.hpp中类LayerRegis

07 训练Tensorflow识别手写数字

打开Python Shell,输入以下代码: 1 import tensorflow as tf 2 from tensorflow.examples.tutorials.mnist import input_data 3 4 # 获取数据(如果存在就读取,不存在就下载完再读取) 5 mnist = input_data.read_data_sets("MNIST_data/", one_hot=True) 6 7 # 输入 8 x = tf.placeholder("flo

【转】机器学习教程 十四-利用tensorflow做手写数字识别

模式识别领域应用机器学习的场景非常多,手写识别就是其中一种,最简单的数字识别是一个多类分类问题,我们借这个多类分类问题来介绍一下google最新开源的tensorflow框架,后面深度学习的内容都会基于tensorflow来介绍和演示 请尊重原创,转载请注明来源网站www.shareditor.com以及原始链接地址 什么是tensorflow tensor意思是张量,flow是流. 张量原本是力学里的术语,表示弹性介质中各点应力状态.在数学中,张量表示的是一种广义的"数量",0阶张量

codevs1281 矩阵乘法 快速幂 !!!手写乘法取模!!! 练习struct的构造函数和成员函数

对于这道题目以及我的快速幂以及我的一节半晚自习我表示无力吐槽,, 首先矩阵乘法和快速幂没必要太多说吧,,嗯没必要,,我相信没必要,,实在做不出来写两个矩阵手推一下也就能理解矩阵的顺序了,要格外注意一些细节,比如快速幂时ans矩阵的初始化方式,快速幂的次数,矩阵乘法过程中对临时矩阵的清零,最后输出结果时的初始矩阵...矩阵快速幂好理解但是细节还是有点小坑的.. 下面就是满满的槽点,,高能慎入!!! 对于这个题目要求矩阵过程中对m取模,结果对g取模,我表示难以接受,,上来没看清题直接wa19个点,另