kafka-->storm-->mongodb

目的:

通过Spout发射kafka的数据,到bolt统计每一个单词的个数,将这些记录更新到mongodb中。

Spout的nextTuple方法会一直处于一个while循环这中,每一条数据发送给bolt后,bolt都会调用一次execute方法。

spout用于发射数据,bolt用于对数据进行处理。

MongoUtil:mongo工具类

package storm;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;

public class MongoUtil {
private MongoUtil(){}
private static MongoClient mongo;
private static DB db;
private static DBCollection collection;
static{
mongo = new MongoClient("192.168.170.185",27017);
db = mongo.getDB("mySpout");
collection = db.getCollection("myBolt");
}
public static Long getCount(){
return collection.count(new BasicDBObject("_id",1L));
}
public static void insert(String substring){
DBObject obj = new BasicDBObject();
obj.put("_id", 1);
obj.put("bolt", substring);
collection.insert(obj);
}
public static void update(String substring){
DBObject obj = new BasicDBObject();
obj.put("_id", 1);
DBObject obj2 = collection.findOne(obj);
obj2.put("bolt", substring);
collection.update(obj, obj2);
}

}

SentenceSpout:发射数据的spout,从kafka读取数据。

package storm;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.common.utils.Utils;
import org.apache.storm.Constants;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import kafka.KafkaConsumer;
import kafka.KafkaProducer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class SentenceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private int index = 0;
private ConsumerConnector consumer;
private Map conf;
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {//尽量将初始化写在open方法中,否则可能会报错。
this.conf = map;
this.collector = collector;
Properties props = new Properties();

// zookeeper 配置 
props.put("zookeeper.connect", "192.168.170.185:2181");

// 消费者所在组 
props.put("group.id", "testgroup");

// zk连接超时 
props.put("zookeeper.session.timeout.ms", "4000"); 
props.put("zookeeper.sync.time.ms", "200"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("auto.offset.reset", "smallest");

// 序列化类 
props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); 
}
@Override
public void nextTuple() {

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put("helloworld", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); 
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); 
Map<String, List<KafkaStream<String, String>>> consumerMap = 
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); 
KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0); 
ConsumerIterator<String, String> it = stream.iterator();

int messageCount = 0; 
while (it.hasNext()){ 
this.collector.emit(new Values(it.next().message().toString()));

// index = (index+1>=sentences.length)?0:index+1;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}

}
SplitSentenceBolt:切割单词bolt

package storm;

import java.util.Map;

import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
private Map stormConf; 
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
this.stormConf = map;
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String str = tuple.getStringByField("sentence");
String[] split = str.split(" ");
for(String word : split){
this.collector.emit(new Values(word));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

}

WordCountBolt:计数的bolt

package storm;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountBolt extends BaseRichBolt{
private Map boltconf;
private OutputCollector collector;
private HashMap<String,Long> counts = null;
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
this.boltconf = map;
this.collector=collector;
this.counts = new HashMap<String,Long>();
}

@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
this.counts.put(word, this.counts.containsKey(word)?this.counts.get(word)+1:1);
this.collector.emit(new Values(word,counts.get(word)));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}

}

ReportBolt:打印记录结果,并将结果插入mongodb中bolt

package storm;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;

public class ReportBolt extends BaseRichBolt{
private HashMap<String,Long> counts = null;
private Map boltconf;
private StringBuffer buf = null;
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.boltconf = arg0;
this.counts=new HashMap<String,Long>();
this.buf = new StringBuffer();
}

@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long counts = tuple.getLongByField("count");
this.counts.put(word, counts);
System.out.println("------统计结果------");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());

buf.append("{");
for(String key : keys){

buf.append(key+":"+this.counts.get(key)).append(",");
System.out.println(key + " : " +this.counts.get(key));
}
System.out.println("------------------");
buf.append("}");
String substring = buf.delete(buf.length()-2, buf.length()-1).toString();

long count = MongoUtil.getCount();
if(count<=0){
MongoUtil.insert(substring);
}else{
MongoUtil.update(substring);
}
buf = buf.delete(0, buf.length());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
/* @Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return hashMap;
}*/
}

WordCountTopology: topology,storm零件的组装

package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";

public static void main(String[] args) throws Exception {

//--实例化Spout和Bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
//--创建TopologyBuilder类实例
TopologyBuilder builder = new TopologyBuilder();

//--注册SentenceSpout
builder.setSpout(SENTENCE_SPOUT_ID, spout);
//--注册SplitSentenceBolt,订阅SentenceSpout发送的tuple
//此处使用了shuffleGrouping方法,此方法指定所有的tuple随机均匀的分发给SplitSentenceBolt的实例。
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
//--注册WordCountBolt,,订阅SplitSentenceBolt发送的tuple
//此处使用了filedsGrouping方法,此方法可以将指定名称的tuple路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
//--注册ReprotBolt,订阅WordCountBolt发送的tuple
//此处使用了globalGrouping方法,表示所有的tuple都路由到唯一的ReprotBolt实例中
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

//--创建配置对象
Config conf = new Config();

//--创建代表集群的对象,LocalCluster表示在本地开发环境来模拟一个完整的Storm集群
//本地模式是开发和测试的简单方式,省去了在分布式集群中反复部署的开销
//另外可以执行断点调试非常的便捷
LocalCluster cluster = new LocalCluster();

//--提交Topology给集群运行
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());

//--运行10秒钟后杀死Topology关闭集群
Thread.sleep(300000000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}

时间: 2024-10-12 15:02:26

kafka-->storm-->mongodb的相关文章

Flume-ng+Kafka+storm的学习笔记

Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html 官方的英文文档 介绍的比较全面. 不过这里写写自己的见解 这个是flume的架构图 从上图可以看到几个名词: Agent: 一个Agent包含Source.Channel.Sink和其他的组件.Flume就是一个或多个Agent构成的. Source:数据源.简单的说就是agent获取数据的入口

flume+kafka+storm+mysql架构设计

前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql (项目是maven项目,需要改动mysql配置,提供两种topology:读取本地文件(用来本地测试):读取服务器日志文件.) (是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱) 实时日志分析系统架构简介 系统主要分为四部分:                         负责从各节点上

kafka+storm+hbase

kafka+storm+hbase实现计算WordCount. (1)表名:wc (2)列族:result (3)RowKey:word (4)Field:count 1.解决: (1)第一步:首先准备kafka.storm和hbase相关jar包.依赖如下: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance&qu

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

新版flume+kafka+storm安装部署

安装步骤: 1.版本介绍: zookeeper3.4.6 flume-ng1.6 kafka2.10-0.8.2 storm0.9.5 2.安装zookeeper 1.下载最新release版zookeeper http://zookeeper.apache.org/releases.html#download 2.修改zookeeper配置文件 $zookeeper_home/conf $ cp zoo_sample.cfg zoo_sample.cfg.bak $ mv zoo_sample

kafka+storm初探

由于项目需要,最近对storm进行了预研,安装与使用方式网上有很多示例,在此记录一下,备忘. 一.storm简介 Storm的术语包括Stream.Spout.Bolt.Task.Worker.Stream Grouping和Topology.Stream是被处理的数据.Sprout是数据源.Bolt处理数据.Task是运行于Spout或Bolt中的 线程.Worker是运行这些线程的进程.Stream Grouping规定了Bolt接收什么东西作为输入数据.数据可以随机分配(术语为Shuffl

Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了.实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接

[转载] Kafka+Storm+HDFS整合实践

转载自http://www.tuicool.com/articles/NzyqAn 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了.实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析

[转]flume-ng+Kafka+Storm+HDFS 实时系统搭建

http://blog.csdn.net/weijonathan/article/details/18301321 一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正:内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE 之前在弄这个