基于Storm的WordCount

Storm WordCount 工作过程

Storm 版本:
1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、WordCountBolt 接收 SplitBolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

Java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

Hadoop 版本:
1、按行读取文件中的数据;
2、在 Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 Mapper()中输出的数据数组,在 Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        //创建一个 TopologyBuilder
        TopologyBuilder tb = new TopologyBuilder();
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));
        //创建配置
        Config conf = new Config();
        //设置 worker 数量
        conf.setNumWorkers(2);
        //提交任务
        //集群提交
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());
        //本地提交
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("myWordcount", conf, tb.createTopology());
    }
} 

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{
    OutputCollector collector; 

    /**      * 初始化      */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        } 

    /**      * 执行方法      */
    public void execute(Tuple input) {
        String line = input.getString(0);
        String[] split = line.split(" ");
        for (String word : split) {
            collector.emit(new Values(word));
            }
        } 

    /**      * 输出      */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        }
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>(); 

    /**      * 初始化      */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        } 

    /**      * 执行方法      */
public void execute(Tuple input) {
    String word = input.getString(0);
    if(map.containsKey(word)){
    Integer c = map.get(word);
        map.put(word, c+1);
        }else{
        map.put(word, 1);
        }
    //测试输出
    System.out.println("结果:"+map);
    } 

    /**      * 输出      */
public void declareOutputFields(OutputFieldsDeclarer declarer) {     

}
} 

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
    /**      * 初始化方法      */
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        } 

    /**      * 重复调用方法      */
    public void nextTuple() {
        collector.emit(new Values("hello world this is a test"));
        } 

    /**      * 输出      */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("test"));
        }
} 

POM.XML 文件内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.stormwordcount.MainClass</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的问题

基于Storm的WordCount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为Eclipse IDE for Eclipse Committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,Help->Install New SoftWare

点击Add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接:http://www.eclipse.org/m2e/index.html 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中Contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的JavaEE IDE了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 Windows->Preferencs->Maven->Installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:

原文地址:https://www.cnblogs.com/p1ng/p/12057174.html

时间: 2024-11-08 06:08:56

基于Storm的WordCount的相关文章

基于storm的在线关联规则

基于storm的在线视频推荐算法,算法依据youtube的推荐算法  算法相对简单,可以认为是关联规则只挖掘频繁二项集.下面给出与storm的结合实现在线实时算法 , 关于storm见这里.首先给出数据流图(不同颜色的线条代表不同的数据流.在storm里面bolt也是可以声明数据流的.) 关联规则挖掘数据项的时候,有事务的概念,这里的事务的定义为:给定时间窗口内用户看过的视频集.所以,我们需要这样一个bolt,根据实时日志收集每个用户看过的视频集----user_videos aggregate

一种基于Storm的可扩展即时数据处理架构思考

问题引入 使用storm可以方便的构建一种集群式的数据框架,并通过定义topo来实现业务逻辑. 但使用topo存在一个缺点, topo的处理能力来自于其启动时设置的worker数目,在很多情况下,我们需要能够根据业务压力来调整集群的处理能力,这时候单一的topo就无法解决这个问题了. 为了能够更加灵活的定义处理能力,可以考虑将原有的topo根据业务域进行拆分,做到互不干扰,灵活控制,而且为了能够更加经济的利用处理资源,可以考虑引入worker资源池的概念,达到对资源的充分利用. 但使用这种多to

基于Storm构建实时热力分布项目实战

详情请交流  QQ  709639943 01.基于Storm构建实时热力分布项目实战 02.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.漫谈spring cloud分布式服务架构 09.Java Spring Se

基于storm的实时数据处理方案

1 文档说明 该文档描述的是以storm为主体的实时处理架构,该架构包括了数据收集部分,实时处理部分,及数据落地部分. 关于不同部分的技术选型与业务需求及个人对相关技术的熟悉度有关,会一一进行分析. 该架构是本人所掌握的一种架构,可能会与其他架构有相似的部分,个人会一一解释对其的理解. 这个文章写的很详细,相信对大家在实时处理整体理解上会有帮助的. 2 实时处理架构 2.1 整体架构图 架构说明: 整个数据处理流程包括四部分,一部分是数据接入层,该部分从前端业务系统获取数据:中间部分是最重要的s

基于Storm的Nginx log实时监控系统

背景 UAE(UC App Engine)是一个UC内部的PaaS平台,总体架构有点类似CloudFoundry,包括: 快速部署:支持Node.js.Play!.PHP等框架 信息透明:运维过程.系统状态.业务状况 灰度试错:IP灰度.地域灰度 基础服务:key-value存储.MySQL高可用.图片平台等 这里它不是主角,不作详细介绍. 有数百个Web应用运行在UAE上,所有的请求都会经过UAE的路由,每天的Nginx access log大小是TB级,如何实时监控每个业务的访问趋势.广告数

基于Storm构建分布式实时处理应用初探

Storm对比Hadoop,前者更擅长的是实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式架构,而且都类似有主/从关系的概念. 本文不会具体阐述Storm集群和Zookeeper集群如何部署的问题,这里想通过一个实际的案例切入,分析一下如何利用Storm完成实时分析处理数据. Storm本身是Apache托管的开源的分布式实时计算系统,它的前身是Twitter Storm.在Stor

基于storm,kafka,mysql的实时统计系统

公司对客户开放多个系统,运营人员想要了解客户使用各个系统的情况,在此之前,数据平台团队已经建设好了统一的Kafka消息通道. 为了保证架构能够满足业务可能的扩张后的性能要求,选用storm来处理各个应用系统上传到kafka中的埋点数据并在Mysql中汇聚. 埋点数据上报的格式为json,会上报类似如下的数据 { "account": "001", "accountName": "旺财宝", "subaccount&q

基于storm的Window

Watermark作用 在解释storm的window之前先说明一下watermark原理. Watermark中文翻译为水位线更为恰当. 顺序的数据从源头开始发送到到操作,中间过程肯定会出现数据乱序情况,比如网络原因,数据并发发送等.如何区分乱序的数据和正常的数据,就引申出了watermark. Watermark是每一个时间窗口的下限,意思是说当watermark大于了窗口截止时间,那么该窗口就应该被关闭.而watermar也会随着时间窗口的变化不断更新自己. 参考下图,列举了几个关键的术语

用实例理解Storm的Stream概念

原文首发在个人博客:http://zqhxuyuan.github.io/2016/06/30/Hello-Storm/ 如需转载,请注明出处,谢谢! 缘起 事情源于在看基于Storm的CEP引擎:flowmix 的FlowmixBuilder代码, 每个Bolt设置了这么多的Group, 而且declareStream也声明了这么多的stream-id, 对于只写过WordCountTopology的小白而言, 直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Gr