storm实战之WordCount



一,环境搭建

  eclipse的项目的创键和jar包的导入。

二,代码编写

  1,组件spout的代码编写,用来发射数据源。

package com;

import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
   //用来收集spout的输出tuple
	private SpoutOutputCollector Collector;
	//private Random rand;
	private static final  long SrialversionUID=1l; 

	@Override
	public void nextTuple() {
//	String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
//	Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
		String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
		Values values=new Values(datas[0]);
            //发射的数据
		Collector.emit(values);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	//初始化操作,只执行一遍
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
		this.Collector=Collector;
	}
        //为发射的数据添加唯一标识,
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("spout"));
	}
}

  2,bolt组件的代码编写,用来切割字段。

package com;

import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
   //用来收集spout的输出tuple
	private SpoutOutputCollector Collector;
	//private Random rand;
	private static final  long SrialversionUID=1l; 

	@Override
	public void nextTuple() {
//	String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
//	Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
		String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
		Values values=new Values(datas[0]);
		Collector.emit(values);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	//初始化操作,只执行一遍
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
		this.Collector=Collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("spout"));

	}

}

  3,bolt组件的代码编写,用来统计字段的数量。

package com;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCount extends BaseRichBolt{

	private static final Long SrialversionUID=1l;
	private OutputCollector collector;
	Map<String,Integer>map=new HashMap<String,Integer>();
	@Override
	public void execute(Tuple value) {
		String data = value.getStringByField("word");
		if(map.containsKey(data)){
			map.put(data, map.get(data)+1);
		}else{
			map.put(data,1);
		}
		 System.out.println(map);
	}

	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this.collector=collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer d) {
		//d.declare(new Fields("words","int"));
	}
}

  4,编写提交类

package com;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class mian {

	public static void main(String[] args) {
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("spout", new RandomSentenceSpout());
		topologyBuilder.setBolt("wordBolt", new WordBolt()).shuffleGrouping("spout");
		topologyBuilder.setBolt("wordint", new WordCount()).fieldsGrouping("wordBolt", new Fields("word"));
		Config config = new Config();
		if(args==null||args.length==0){              //集群模式
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("wordCount",config ,topologyBuilder.createTopology());
		}else{              //单机模式
			config.setNumWorkers(1);
			try {
				StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (AuthorizationException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

  5,打成jar包,上传到服务器运行。注意只打主类的class,不要连带项目中的jar一起打入。否则在集群上面会报错。



storm实战之WordCount

原文地址:https://www.cnblogs.com/songweideboke/p/9901083.html

时间: 2024-10-12 22:38:33

storm实战之WordCount的相关文章

Storm实战常见问题及解决方案

文档说明 该文档包涵了storm实战中经常遇到一些问题,及对应解决方案.这个文档是群里一个朋友在学习storm,并实战storm中遇到的一些问题,及和群里其他朋友一起交流给出的对应解决方案,并由他整理好,委托我发布出来(也算是交流者之一),供大家参考,希望能对大家有所帮助. 感谢 某某(哈哈 鉴于部分原因,不便透露名字~~~~!)… 问题锦集 1 关于Storm集群 1.1 关于storm集群的环境变量配置问题 安装好JDK后,需要配置环境变量,通常情况下出于经验,我们往往会修改/etc/pro

转载文档:Storm实战常见问题及解决方案

该文档为实实在在的原创文档,转载请注明: http://blog.sina.com.cn/s/blog_8c243ea30101k0k1.html 类型 详细 备注 该文档是群里几个朋友在storm实战中遇到的一些问题,及其对应解决办法.     相关描述 ²  其他相关文档请参考新浪博客http://blog.sina.com.cn/huangchongyuan ²  有任何其他想法,可以邮件[email protected] ² 文档及相关资料下载请个人360云盘http://yunpan.

Storm手写WordCount

建立一个maven项目,在pom.xml中进行如下配置: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0

storm+kafka:WordCount程序

简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic.  首先规划需要用到的类:  从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类: 数据切分bolt:SplitSentenceBolt: 计数bolt:WordCountBolt: 报表bolt:ReportBolt: topology定义:WordCountTopology: 最后再加一个原样显示

Storm入门(八)Storm实战常见问题总结

一.本地环境log级别设置问题 storm-core-1.1.0.jar下面有个log4j2.xml文件,默认log级别是info. <configuration monitorInterval="60"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%-4r [%t] %-5p

Storm常用操作命令及WordCount

Storm常用操作命令 1.任务提交命令:storm jar [jar路径] [拓扑包名.拓扑类名] [拓扑名称] storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology  wordcount 与hadoop不同的是:不需要指定输入输出路径 hadoop jar /usr/local/wo

storm 流式计算框架

一:storm 简介 二:storm 的原理与架构 三:storm 的 安装配置 四:storm 的启动脚本 一: storm 的简介: 1.1 storm 是什么: 1. Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等,大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算

storm教程

二.安装部署 ? 一.storm伪分布式安装 (一)环境准备1.OS:debian 72.JDK 7.0 (二)安装zookeeper1.下载zookeeper并解压?wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz?tar -zxvf zookeeper-3.4.6.tar.gz2.准备配置文件cd confcp zoo_sample.cfg zoo.cfg?3.启动zoo

Storm常见问题处理

错误1:发布topologies到远程集群时,出现Nimbus host is not set异常.异常内容如下所示: [[email protected] bin]# ./storm jar /home/clx/storm-starter.jar storm.starter.WordCountTopology wordcount Running: export STORM_JAR=/home/clx/storm-starter.jar; java -client -Djava.library.