Strom之WordCount

新建Maven项目

<dependencies>
   <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.2-incubating</version>
   </dependency>
  </dependencies>

JavaCode

  1 package mystorm;
  2
  3 import java.io.File;
  4 import java.io.IOException;
  5 import java.util.HashMap;
  6 import java.util.List;
  7 import java.util.Map;
  8 import java.util.Map.Entry;
  9
 10 import org.apache.commons.io.FileUtils;
 11
 12 import clojure.main;
 13 import clojure.lang.MapEntry;
 14
 15 import backtype.storm.LocalCluster;
 16 import backtype.storm.StormSubmitter;
 17 import backtype.storm.generated.AlreadyAliveException;
 18 import backtype.storm.generated.InvalidTopologyException;
 19 import backtype.storm.spout.SpoutOutputCollector;
 20 import backtype.storm.task.OutputCollector;
 21 import backtype.storm.task.TopologyContext;
 22 import backtype.storm.topology.OutputFieldsDeclarer;
 23 import backtype.storm.topology.TopologyBuilder;
 24 import backtype.storm.topology.base.BaseRichBolt;
 25 import backtype.storm.topology.base.BaseRichSpout;
 26 import backtype.storm.tuple.Fields;
 27 import backtype.storm.tuple.Tuple;
 28 import backtype.storm.tuple.Values;
 29
 30 //本地模式
 31 public class WordCountApp {
 32
 33     public static void main(String[] args) throws Exception {
 34         final TopologyBuilder topologyBuilder = new TopologyBuilder();
 35
 36         topologyBuilder.setSpout("1", new MySpout());
 37         topologyBuilder.setBolt("2", new SplitLineBolt()).shuffleGrouping("1");
 38         topologyBuilder.setBolt("3", new WordCountBolt()).shuffleGrouping("2");
 39
 40         final HashMap conf = new HashMap();
 41
 42         final StormSubmitter stormSubmitter = new StormSubmitter();
 43         stormSubmitter.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());
 44     }
 45 }
 46
 47 class MySpout extends BaseRichSpout{
 48     private static final long serialVersionUID = 1L;
 49
 50     SpoutOutputCollector collector = null;
 51
 52     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
 53         this.collector = collector;
 54     }
 55
 56     //最最重要的方法,处理数据的。简单认为是死循环的,监听文件内容的变化
 57     public void nextTuple() {
 58         try {
 59             final List<String> readLines = FileUtils.readLines(new File("/root/Downloads/hello"));
 60             for (String line : readLines) {
 61                 //把每一行看作一个tuple
 62                 final Values tuple = new Values(line);
 63                 //collector把tuple送出去,交给bolt处理
 64                 collector.emit(tuple);
 65             }
 66             Thread.sleep(2000L);
 67         } catch (Exception e) {
 68             e.printStackTrace();
 69         }
 70     }
 71
 72
 73     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 74         final Fields fields = new Fields("line");
 75         declarer.declare(fields);
 76     }
 77
 78 }
 79
 80 class SplitLineBolt extends BaseRichBolt{
 81
 82     OutputCollector collector = null;
 83
 84     public void execute(Tuple tuple) {
 85         final String line = tuple.getString(0);
 86         final String[] splited = line.split("\t");
 87         for (String word : splited) {
 88             this.collector.emit(new Values(word));
 89         }
 90         try {
 91             Thread.sleep(2000L);
 92         } catch (InterruptedException e) {
 93             e.printStackTrace();
 94         }
 95     }
 96
 97     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
 98         this.collector = collector;
 99     }
100
101     public void declareOutputFields(OutputFieldsDeclarer declarer) {
102         declarer.declare(new Fields("word"));
103     }
104 }
105
106
107 class WordCountBolt extends BaseRichBolt{
108
109     OutputCollector collector = null;
110
111     Map<String,Integer> map = new HashMap<String,Integer>();
112
113     public void execute(Tuple tuple) {
114         final String word = tuple.getString(0);
115         final Integer value = map.get(word);
116         if(value==null) {
117             map.put(word, 1);
118         }else {
119             map.put(word, value+1);
120         }
121
122         try {
123             Thread.sleep(2000L);
124         } catch (InterruptedException e) {
125             e.printStackTrace();
126         }
127     }
128
129     @Override
130     public void cleanup() {
131         for (Entry<String, Integer> entry : map.entrySet()) {
132             System.err.println(entry.getKey()+":"+entry.getValue());
133         }
134     }
135
136     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
137         this.collector = collector;
138     }
139
140     public void declareOutputFields(OutputFieldsDeclarer declarer) {
141         declarer.declare(new Fields("word"));
142     }
143
144
145 }

本地运行

final HashMap conf = new HashMap();

final LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());

时间: 2024-11-06 21:57:20

Strom之WordCount的相关文章

老男孩教育大数据课程体系2折起超值优惠开班

老男孩教育大数据课程体系2折起超值优惠开班 大数据与云计算的未来发展趋势和前景已经极其广阔,未来的互联网就是大数据和云计算的天下,大数据和云计算将成为每一个IT人员必须会的技术了.老男孩教育也顺势而为重金聘请一流核心骨干讲师,打造互联网大数据课程,绝对让同学们心悦诚服,绝对物超所值,让同学在技术道路上捷足先登,做IT技术达人,成为人生赢家.必须要要学大数据的里有见本文结尾. 虽然价格极低,但课程含金量坚决不低于18000的价值! 一.开课信息 1.开课时间:2016年2月23日(如果满30人,1

strom:实时的WordCount

集采单词 package wordcount; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import backtype.storm.spout.SpoutOutputCollector; import backty

Apache Strom和Kafka的简单笔记 (零) - 开端

一.什么是实时计算系统?(流式计算)1.离线计算和实时计算 离线计算 实时计算(流式计算) 典型代表 mapReduce Apache Strom,Spark Streaming 和JStream 数据 HDFS上 实时数据 采集数据 Sqoop(批量导入) Flume进行采集 保存结果 HDFS Redis上 (HDFS,HBase,Hive,JDBC[oracle,mysql]) 2.举例 : 自来水场处理自来水3,strom体系结构 (*) 主节点 : nimbus 从节点 : super

JStorm第一个程序WordCount详解

一.Strom基本知识(回顾) 1,首先明确Storm各个组件的作用,包括Nimbus,Supervisor,Spout,Bolt,Task,Worker,Tuple nimbus是整个storm任务的管理者,并不实际进行工作.负责在集群中分发代码,对节点分配任务,并监视主机故障. supervisor是实际进行工作的节点,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程. Worker是具体处理Spout/Bolt逻辑的进程,worker数量由拓扑中的conf.s

Strom优化指南

摘要:本文主要讲了笔者使用Strom中的一些优化建议 1.使用rebalance命令动态调整并发度 Storm计算以topology为单位,topology提交到Storm集群中运行后,通过storm rebalance 命令可对topology进行动态调整.比如增加Topology的worker数,修改Bolt,Spout的并行执行数量 parallelism等,从而实现topology的动态调整,达到弹性计算的目的.(当然调整时要配合监控模块) 基本上主要有两种用法:1) storm reb

年薪40万的大数据工程师是如何安装Strom

Strom集群的安装配置 主机规划 机器名 域名 IP地址 storm-01 192.168.33.31 Storm(minbus).zookeeper storm-02 192.168.33.32 Storm(supervisor).zookeeper storm-03 192.168.33.33 storm(supervisor).zookeeper Storm-04 192.168.33.34 storm(supervisor) 一.准备服务器 l  关闭防火墙 chkconfig ipt

Strom流式计算

序言 主要学习方向 Kafka 分布式消息系统 Redis 缓存数据库 Storm 流式计算 1.Storm 的基本概念 2.Storm 的应用场景 3.Storm 和Hadoop的对比 4.Storm 集群的安装的linux环境准备 5.zookeeper集群搭建 6.Storm 集群搭建 7.Storm 配置文件配置项讲解 8.集群搭建常见问题解决 9.Storm 常用组件和编程 API:Topology. Spout.Bolt 10.Storm分组策略(stream groupings)

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

彻底解密WordCount运行原理(DT大数据梦工厂)

主要内容: 数据流动视角解密WordCount RDD依赖关系视角解密WordCount DAG与Lineage的思考 ==========数据流动视角============ 新建文件,里面输入 Hello Spark Hello Scala Hello Hadoop Hello Flink Spark is awesome 修改代码: package com.dt.spark.SparkApps.cores; import java.util.Arrays; import java.util