Storm- 使用Storm实现词频汇总

需求:读取指定目录的数据,并实现单词计数的功能

实现方案

  Spout来读取指定目录的数据,作为后续Bolt处理的input

  使用一个Bolt把input 的数据,切割分开,我们按照逗号进分割

  使用一个Bolt来进行最终的单词次数统计操作并输出

拓扑设计:DataSourceSpout ==>SpiltBolt ==>CountBolt

Storm编程注意,Topology,Spout,Bolt等命名不能重复,伤到集群需要注意出现重复命名,会报错的。

package com.imooc.bigdata;

import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 使用Storm完成词频统计功能
 */
public class LocalWordCountStormTopology {
    public static class DataSourceSpout extends BaseRichSpout{
        private SpoutOutputCollector collector;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 业务逻辑
         * 1) 读取指定目录文件夹下的数据:E:\iso\linux
         * 2) 把每一行的数据发射出去
         */
        @Override
        public void nextTuple() {

            // 获取所有文件
            Collection<File> files = FileUtils.listFiles(new File("E:\\iso\\linux"), new String[]{"txt"}, true);
            for (File file: files){
                try {
                    // 获取文件中的所有内容
                    List<String> lines = FileUtils.readLines(file);

                    // 获取文件中的每行的内容
                    for (String line: lines){

                        // 发射出去
                        this.collector.emit(new Values(line));
                    }

                    // TODO... 数据处理完成之后,改名,否则一直重复执行
                    FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));

        }
    }

    /**
     * 对数据进行分割
     */
    public static class SplitBolt extends BaseRichBolt{
        private OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 业务逻辑:
         *  line: 对line进行分割,按逗号进行分割
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(",");

            for (String word: words){
                this.collector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    /**
     * 词频汇总Bolt
     */
    public static class WordCountBlot extends BaseRichBolt{

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        Map<String, Integer> map = new HashMap<String, Integer>();
        /**
         * 业务逻辑:
         * 1)获取每个单词
         * 2)对所有单词进行汇总
         * 3)输出
         * @param input
         */
        @Override
        public void execute(Tuple input) {

            // 1)获取每个单词
            String word = input.getStringByField("word");
            Integer count = map.get(word);
            if (count == null){
                count = 0;
            }
            count ++;

            // 2)对所有单词进行汇总
            map.put(word, count);

            // 3)输出
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry: entries) {
                System.out.println(entry);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {

        // 通过TopologyBuilder根据Spout和Bilt构建Topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("WordCountBlot", new WordCountBlot()).shuffleGrouping("SplitBolt");

        // 创建本地集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology", new Config(), builder.createTopology());

    }
}

原文地址:https://www.cnblogs.com/RzCong/p/9383141.html

时间: 2024-09-30 03:31:15

Storm- 使用Storm实现词频汇总的相关文章

storm学习-storm入门

超好资料: 英文:https://github.com/xetorthio/getting-started-with-storm/blob/master/ch03Topologies.asc 中文:http://ifeve.com/getting-started-with-storm-3/ 下面具体讲下:storm的几种groupping 策略的例子 Storm Grouping shuffleGrouping 将流分组定义为混排.这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bo

Storm入门--Storm编程

以电信通话记录为例 移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数. 编程思想: 在storm中,把对数据的处理过程抽象成一个topology,这个topology包含的组件主要是spout.bolt,以及以tuple形式在组件之间传输的数据流.这个数据流在topology流一遍,就是对数据的一次处理. 1.创建Spout类 这一部分,是创建数据流的源头. 创建一个类,实现IRichSpout接口,实现相应方法.其中几

Storm笔记——技术点汇总

目录 · 概述 · 手工搭建集群 · 引言 · 安装Python · 配置文件 · 启动与测试 · 应用部署 · 参数配置 · Storm命令 · 原理 · Storm架构 · Storm组件 · Stream Grouping · 守护进程容错性(Daemon Fault Tolerance) · 数据可靠性(Guaranteeing Message Processing) · 消息传输机制 · API · WordCount示例 · 应用部署方式 · 组件接口 · 组件实现类 · 数据连接方

Storm实时日志分析实战

项目背景 最近公司做一个项目,用户需要对网站访问者的广告点击/浏览记录进行实时统计分析,分析结果存入数据库,输出报表.我们采用了Kafka+Storm+Zookeeper的解决方案.之前没有接触过,经过一段时间的研究,最终完成了项目.接下来的内容我将介绍我们的解决方案.供大家参考.我们的系统结构如下: 总体结构介绍 业务系统把点击/浏览广告业务日志统一按规定的格式发送到Kafka集群中,不同的业务日志可以分别发送给Kafka不同的主题.Storm集群中运行了我们的实时统计拓扑,该统计拓扑分别从K

Flume-ng+Kafka+storm的学习笔记

Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html 官方的英文文档 介绍的比较全面. 不过这里写写自己的见解 这个是flume的架构图 从上图可以看到几个名词: Agent: 一个Agent包含Source.Channel.Sink和其他的组件.Flume就是一个或多个Agent构成的. Source:数据源.简单的说就是agent获取数据的入口

storm源码之storm代码结构【译】【转】

[原]storm源码之storm代码结构[译] 说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助. Storm的源码共分为三个不同的层次. 首先,Storm在设计之初就考虑到了兼容多语言开发.Nimbus是一个thrift服务,topologies被定义为Thrift结构体.Thrift的运用使得Storm可以被任意开发语言使用. 其次,Stor

storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus.bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数: nimbus函数 def nimbus(klass="backtype.storm.daemon.nimbus"):    """Syntax: [s

storm transaction

storm transaction storm的事务主要用于对数据准确性要求非常高的环境中,尤其是在计算交易金额或笔数,数据库同步的场景中. storm 事务逻辑是挺复杂的,而且坦白讲,代码写的挺烂的. JStorm下一步将重新设计基于Meta 1 和Meta3 的事务模型,让使用者更简便,代码更清晰. 一个基本的例子 你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topolog

storm文档(11)----搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41684717 源地址:http://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 本文叙述了storm集群搭建和运行步骤.如果你打算在AWS上进行的话,可以使用storm-deploy项目.storm-deploy在EC2上完全自动进行下载.配置.以及storm集群的安装等步骤.它也为你配置了Gan