Storm实验 -- 单词计数4

在上一次单词计数的基础上做如下改动: 使用 自定义
 分组策略,将首字母相同的单词发送给同一个task计数

自定义 CustomStreamGrouping

package com.zhch.v4;

import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ModuleGrouping implements CustomStreamGrouping, Serializable {
    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List<Integer> targetTasks) {
        this.tasks = targetTasks;
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        List<Integer> taskIds = new ArrayList<Integer>();
        if (values.size() > 0) {
            String str = values.get(0).toString();
            if (str.isEmpty()) {
                taskIds.add(0);
            } else {
                Integer index = str.charAt(0) % tasks.size();
                taskIds.add(tasks.get(index));
            }
        }
        return taskIds;
    }
}

数据源spout

package com.zhch.v4;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class SentenceSpout extends BaseRichSpout {
    private FileReader fileReader = null;
    private boolean completed = false;

    private ConcurrentHashMap<UUID, Values> pending;
    private SpoutOutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.pending = new ConcurrentHashMap<UUID, Values>();

        try {
            this.fileReader = new FileReader(map.get("wordsFile").toString());
        } catch (Exception e) {
            throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");
        }
    }

    @Override
    public void nextTuple() {
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }

        String line;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            while ((line = reader.readLine()) != null) {
                Values values = new Values(line);
                UUID msgId = UUID.randomUUID();
                this.pending.put(msgId, values);
                this.collector.emit(values, msgId);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}

实现语句分割bolt

package com.zhch.v4;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(tuple, new Values(word));
        }
        this.collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

实现单词计数bolt

package com.zhch.v4;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);

        BufferedWriter writer = null;
        try {
            writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));
            List<String> keys = new ArrayList<String>();
            keys.addAll(this.counts.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Long c = this.counts.get(key);
                writer.write(key + " : " + c);
                writer.newLine();
                writer.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writer = null;
            }
        }

        this.collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

实现单词计数topology

package com.zhch.v4;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class WordCountTopology {
    public static final String SENTENCE_SPOUT_ID = "sentence-spout";
    public static final String SPLIT_BOLT_ID = "split-bolt";
    public static final String COUNT_BOLT_ID = "count-bolt";
    public static final String TOPOLOGY_NAME = "word-count-topology-v4";

    public static void main(String[] args) throws Exception {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
                .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略

        Config config = new Config();
        config.put("wordsFile", args[0]);

        if (args != null && args.length > 1) {
            config.setNumWorkers(2);
            //集群模式启动
            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
            }
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        }
    }
}

提交到Storm集群

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4

运行结果:

[[email protected] stormData]$ cat result.txt
Apache : 1
ETL : 1
It : 1
Storm : 4
a : 4
analytics : 1
and : 5
any : 1
at : 1
can : 1
cases: : 1
clocked : 1
computation : 2
continuous : 1
easy : 2
guarantees : 1
is : 6
it : 2
machine : 1
makes : 1
many : 1
million : 1
more : 1
of : 2
online : 1
open : 1
operate : 1
over : 1
scalable : 1
second : 1
set : 1
simple : 1
source : 1
streams : 1
system : 1
unbounded : 1
up : 1
use : 2
used : 1
what : 1
will : 1
with : 1
your : 1
[[email protected] stormData]$ cat result.txt
Hadoop : 1
RPC : 1
batch : 1
be : 2
benchmark : 1
data : 2
did : 1
distributed : 2
doing : 1
fast: : 1
fault-tolerant : 1
for : 2
free : 1
fun : 1
has : 1
language : 1
learning : 1
lot : 1
node : 1
per : 2
process : 1
processed : 2
processing : 2
programming : 1
realtime : 3
reliably : 1
to : 3
torm : 1
tuples : 1
时间: 2024-12-25 22:02:19

Storm实验 -- 单词计数4的相关文章

Storm实验 -- 单词计数3

在上一次单词计数的基础上做如下改动: 使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数 数据源spout package com.zhch.v3; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import b

Storm实验 -- 单词计数2

在上一次单词计数的基础上做如下改动: 1. 使用可靠的消息处理机制 2. 配置 worker .executor.task 数量 3. 使用集群模式提交 数据源spout package com.zhch.v2; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclar

Storm实现单词计数

package com.mengyao.storm; import java.io.File;import java.io.IOException;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry; import org.apache.commons.io.FileUtils; import backt

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

自然语言处理第二讲:单词计数

自然语言处理:单词计数 这一讲主要内容(Today): 1.语料库及其性质: 2.Zipf 法则: 3.标注语料库例子: 4.分词算法: 一. 语料库及其性质: a) 什么是语料库(Corpora) i. 一个语料库就是一份自然发生的语言文本的载体,以机器可读形式存储: ii. 一种平衡语料库尝试在语言或者其他领域具有代表性: b) 译者注:平行语料库与平衡语料库的特点与区别 i. 平行语料库通常是由双语或多语的对应语料构成,常常是翻译文本构成.例如:Babel English-Chinese

《C程序设计语言》关于单词计数的思考

代码分析 源代码来源于Brian W. Kernighan 和 Dennis M. Ritchie 共同编著的书籍 <The C Program Language> 中1.5.4节中的单词计数. 中文版原文:这里对单词的定义比较宽裕,它是任何其中不含空格.制表符或换行符的字符序列,下面这段程序是UNIX系统中wc程序的骨干部分. #include<stdio.h> #define IN 1 /* 单词内 */ #define IN 0 /* 单词外 */ main() { int

MapReduce之单词计数

最近在看google那篇经典的MapReduce论文,中文版可以参考孟岩推荐的 mapreduce 中文版 中文翻译 论文中提到,MapReduce的编程模型就是: 计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce. 用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduc

第一章 flex单词计数程序

学习Flex&Bison目标, 读懂SQLite中SQL解析部分代码 Flex&Bison简介Flex做词法分析Bison做语法分析 第一个Flex程序, wc.fl, 单词计数程序 %{ int chars = 0; int words = 0; int lines = 0; %} %% [a-zA-Z]+ { words++; chars += strlen(yytext); } \n { chars++; lines++; } . { chars++; } %% main(int a

【Hadoop基础教程】5、Hadoop之单词计数

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的src/example目录下找到.单词计数主要完成的功能:统计一系列文本文件中每个单词出现的次数,如下图所示.本blog将通过分析WordCount源码来帮助大家摸清MapReduce程序的基本结构和运行机制. 开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Jav