使用Storm进行词频统计

词频统计

1.需求:读取指定目录的数据,并且实现单词计数功能

2.实现方案:

Spout用于读取指定文件夹(目录),读取文件,将文件的每一行发射到Bolt

SplitBolt用于接收Spout发射过来的数据,并拆分,发射到CountBolt

CountBolt接收SplitBolt发送的每一个单词,进行单词计数操作

3.拓扑设计:

DataSourceSpout + SplitBolt + CountBolt

代码如下:

package com.csylh;

import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * Description:使用Storm完成词频统计功能
 *
 * @author: 留歌36
 * Date:2018/9/4 9:28
 */
public class LocalWordCountStormTopology {
    /**
     * 读取数据并发送到Bolt上去
     */
    public static class DataSourceSpout extends BaseRichSpout{
        //定义一个发射器
        private SpoutOutputCollector collector;

        /**
         * 初始化方法 只是会被调用一次
         * @param conf  配置参数
         * @param context  上下文
         * @param collector  数据发射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            //对上面定义的的发射器进行赋初值
            this.collector = collector;
        }

        /**
         * 用于数据的产生
         * 业务:
         * 1.读取指定目录的文件夹下的数据
         * 2.把每一行数据发射出去
         */
        @Override
        public void nextTuple() {
//            获取所有文件,这里指定文件的后缀
            Collection<File> files = FileUtils.listFiles(new File("E:\\StormText"),new String[]{"txt"},true);
//            循环遍历每一个文件 ==>  由于这里指定的是文件夹下面的目录 所以就是需要进行循环遍历
            for( File file : files){
                try {
//                    获取每一个文件的每一行
                    List<String> lines =  FileUtils.readLines(file);
                    for(String line : lines){
//                        把每一行数据发射出去
                        this.collector.emit(new Values(line));
                    }
                    //TODO 数据处理完毕之后 改名  否则的话 会一直执行的
                    FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }

        /**
         * 声明输出字段名称
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }
    /**
     * 对Spout发送过来的数据进行分割
     */
    public static class SplitBolt extends BaseRichBolt{
        private OutputCollector collector;
        /**
         * 初始化方法  只是会被执行一次
         * @param stormConf
         * @param context
         * @param collector Bolt的发射器,指定下一个Bolt的地址
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
        }

        /**
         * 用于获取Spout发送过来的数据
         * 业务逻辑
         *  spout发送过来的数据是一行一行的line
         *  这里是需要line进行分割
         *
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(",");

            for(String word : words){
//                这里把每一个单词发射出去
                this.collector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
        }
    }
    /**
     * 词频汇总的Bolt
     */
    public static class CountBolt extends BaseRichBolt{
        /**
         * 由于这里是不需要向外部发射  所以就不需要定义Collector
         * @param stormConf
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        }
        Map<String,Integer> map = new HashMap<String, Integer>();
        /**
         * 业务逻辑
         * 1.获取每一个单词
         * 2.对每一个单词进行汇总
         * 3.输出结果
         * @param input
         */
        @Override
        public void execute(Tuple input) {
//            获取每一个单词
           String word = input.getStringByField("word");
           Integer count =  map.get(word);
           if (count == null){
               count = 0;
           }
            count++;
//           对单词进行汇总
            map.put(word,count);
//           输出
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
            Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
            for(Map.Entry<String,Integer> entry :entrySet){
                System.out.println(entry);
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
    /**
     * 主函数
     * @param args
     */
    public static void main(String[] args) {
//            使用TopologyBuilder根据Spout和Bolt构建Topology
        TopologyBuilder builder = new TopologyBuilder();
//            设置Bolt和Spout  设置Spout和Bolt的关联关系
        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");
//            创建一个本地的集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());
    }
}

小结:开发Storm程序的步骤就是:

根据需求 设计实现方案 规划拓扑

一般是先写Spout数据产生器 发射数据到Bolt

接着,就是Bolt进行数据处理,如果有多个Bolt,非最后一个Bolt也要写发射器Collector

最后一个Bolt直接输出结果或者 输出到HDFS或者关系型数据库中

最终需要将Spout和Bolt进行组装起来(借助TopologyBuilder)

原文地址:https://www.cnblogs.com/liuge36/p/9882747.html

时间: 2024-11-02 19:02:55

使用Storm进行词频统计的相关文章

Storm- 使用Storm实现词频汇总

需求:读取指定目录的数据,并实现单词计数的功能 实现方案: Spout来读取指定目录的数据,作为后续Bolt处理的input 使用一个Bolt把input 的数据,切割分开,我们按照逗号进分割 使用一个Bolt来进行最终的单词次数统计操作并输出 拓扑设计:DataSourceSpout ==>SpiltBolt ==>CountBolt Storm编程注意,Topology,Spout,Bolt等命名不能重复,伤到集群需要注意出现重复命名,会报错的. package com.imooc.big

02 使用Flink的本地模式完成词频统计

前面我们已经安装了flink,与storm一样,flink也有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中使用这种模式.本节将阐述如何使用本地模式的flink进行词频统计. 1 系统.软件以及前提约束 CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置 idea 2018.1 在Win10中安装nc https://www.jianshu.com/p/4f6fb8834ad9 2 操作 1 在idea中创建一个m

词频统计-单元测试

我自己的单元测试没有弄出来,我用c编的,在visual studio中貌似实现不了单元测试,而李俞寰同学是用c#编写的词频统计,在vs2015中实现单元测试无比的方便,所以我请教了他并借鉴了一下. [TestMethod()] public void DictionarySortTest() { Dictionary<string,int>input=new Dictionary<string,int>() { {"you,1}, {"are",1},

词频统计-功能一

一.完成一个小程序 我 拿到这个题目之后,就决定用最不熟悉的c#来实现,因为老师说不懂的去学才会有进步.布置任务后的第二天就开始去图书馆借了两本书<c#从入门到精髓>,<c#项目实战>,拿到书之后看了入门书<c#从入门到精髓>,看书的过程时痛苦的,因为发现大二选修课学的c#全交还给老师了,只能重头再学了.唯一有点印象的就是窗口应用程序,基于UI的设计. 写代码首先需要工具,由于电脑上没有visual studio的安装包,当时求助了度娘. 如果没有安装包的同学们,可以借

词频统计效能测试---------第二版

在第一次的词频统计中,对JProfile 款软件不是很熟悉,感觉数据不是很准确,在程序启动时JProfile总是提示Java虚拟机已退出,后来经过查阅知道解决方案:截图如下   要将 keep vm alive 勾选上. 程序总体总体情况如下: 当程序运行之后,cpu和内存的使用几乎在同一时间有一个明显的上升过程. 各个对象使用情况如下 下面是热点函数的展示,这也和我在程序中运用时间戳确定建树函数[generateCharTree()]占用时间较多的情况相符.因为对这个程序来说主要时间都花费在单

结对项目 - 词频统计Ⅱ

目的与要求 代码复审练习 结对练习 编写单元测试 基于上一个结对项目的结果,读取小文本文件A_Tale_of_Two_Cities.txt 或者 大文本文件Gone_with_the_wind.txt,统计某一指定单词在该文本文件中出现的频率. 命令行格式: 提示符> Myapp.exe -f filename.txt -w word (PS:C++ 程序,Java 程序输出方式类似) 解释: 选项 -f 表示打开某一文件 选项 -w 表示统计其后单词在打开的文件中的频率 详细内容 开发语言:J

个人项目——词频统计

前言: 开发工具:Visual Studio 2013 开发语言:C++ 源代码管理工具:Github Github源代码网址:https://github.com/superyy/YY1/blob/master/%E8%AF%8D%E9%A2%91%E7%BB%9F%E8%AE%A1main.cpp 预计各功能所花时间:some hours 实际各功能所花时间:some hours 性能提高所花时间:some hours 要求 :实现一个控制台程序,给定一段英文字符串,统计其中各个英文单词(4

实验二-3 Hadoop&amp;Paoding 中文词频统计

  参考教程 在Hadoop上使用庖丁解牛(较复杂,并未采用,可以之后试试) http://zhaolinjnu.blog.sohu.com/264905210.html Lucene3.3.Lucene3.4中文分词——庖丁解牛分词实例(屈:注意版本) http://www.360doc.com/content/13/0217/13/11619026_266124504.shtml 庖丁分词在hadoop上运行时的配置问题(采纳了一半,没有按照其所写配置dic属性文件) http://f.da

初学Hadoop之中文词频统计

1.安装eclipse 准备 eclipse-dsl-luna-SR2-linux-gtk-x86_64.tar.gz 安装 1.解压文件. 2.创建图标. ln -s /opt/eclipse/eclipse /usr/bin/eclipse #使符号链接目录 vim /usr/share/applications/eclipse.desktop #创建一个  Gnome 启动 添加如下代码: [Desktop Entry] Encoding=UTF-8 Name=Eclipse 4.4.2