storm准实时应用

文章出处:http://blog.csdn.net/lili72/article/details/42246671

1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上。

2 设计架构:

1) 客户端产生数据---

2) kafka-生产者实时采集数据(保留7天)-----

3) storm实时消费数据,处理数据

4)把实时数据统计结果缓存到memcached 中

5) 把数据保存到mysql

3 组件之间的通信.

3.1  客户端发送数据---Nginx接收 分布式放在多台服务器上。

3.2  (flume读取接 收集文件信息传给kafka)-kafka生产者直接收集文件信息。

3.3  kafka与storm 通过插件storm-kafka 通信

3.4  storm 与缓存 memcached   java程序 读取mysql的结果缓存到 memcached

3.5   zookeeper 用工具 curator-client,锁同步机制。

(对应的插件可以在github上找到 https://github.com/)

4  场景在现:即席查询用户注册数,用户登录数,当前在线人数

4.1   Storm 处理:

4.1.1 数据清理阶段:

Storm从kafka得到对应的topic数据,然后对数据进行清洗。Storm获取实时JSON数据,然后通过解析JSON数据,格式化之后利用storm-hdfs把数据传到HDFS上。或者实时统计数据存放到关系型数据库中。

package com.ks.topology;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

import com.google.common.collect.ImmutableList;
import com.ks.bolt.ConJsonToData;
import com.ks.bolt.CounterBolt;

/**
 * @author root
 *
 */
public class CountUserLogin {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try{
			String kafkaZookeeper = "192.168.119.131:2181,192.168.119.132:2181,192.168.119.133:2181";
			BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
			SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "userlogin", "/userlogin", "id");
	        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	        kafkaConfig.zkServers =  ImmutableList.of("192.168.119.131","192.168.119.132","192.168.119.133");
	        kafkaConfig.zkPort = 2181;

	        //kafkaConfig.forceFromStart = true;

	        TopologyBuilder builder = new TopologyBuilder();
	        builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
	        builder.setBolt("counter", new CounterBolt(),1).shuffleGrouping("spout");
	       builder.setBolt("ConJsonToData", new ConJsonToData(),1).shuffleGrouping("counter");

	        Config config = new Config();
	        config.setDebug(true);

	        if(args!=null && args.length > 0) {
	            config.setNumWorkers(2);

	            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
	        } else {
	            config.setMaxTaskParallelism(3);

	            LocalCluster cluster = new LocalCluster();
	            cluster.submitTopology("CountUserLogin-topology", config, builder.createTopology());

	            Thread.sleep(500000);

	            cluster.shutdown();
	        }
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

}
package com.ks.bolt;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CounterBolt extends BaseBasicBolt {

	/**
	 *
	 */
	private static final long serialVersionUID = -5508421065181891596L;

	private static long counter = 0;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));
		collector.emit(new Values(tuple));

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("userloginStr"));

	}

}
package com.ks.bolt;

import java.io.IOException;

import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import com.ks.model.UserModel;

public class ConJsonToData  extends BaseBasicBolt{

	private static final ObjectMapper mapper = new ObjectMapper();
	private static final long serialVersionUID = 5596476183440049414L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String  str =tuple .getString(0);
		System.out.println("str------------"  +str+"  str------------");
		UserModel bean =null;
        if(str!=null){
       	 try {
			bean = mapper.readValue(str, UserModel.class);
			 System.out.println(bean.toString());
		} catch (JsonParseException e) {
			e.printStackTrace();
		} catch (JsonMappingException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}

        }

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {

	}

}

model

package com.ks.model;

public class UserLog {

//{"serverid":"1001","time":"2014-12-11 00:00:51","userid":12345678,"appid":8888,"client_ip":"192.136.20.210"}

	private String   serverid="";

	private String  time="";

	private  String  userid="";

	private  Integer  appid=0;

	private  String  client_ip="";

	public UserLog(){

	}

	public UserLog(String serverid, String time, String userid, Integer appid,
			String client_ip) {
		this.serverid = serverid;
		this.time = time;
		this.userid = userid;
		this.appid = appid;
		this.client_ip = client_ip;
	}

	public String getServerid() {
		return serverid;
	}

	public void setServerid(String serverid) {
		this.serverid = serverid;
	}

	public String getTime() {
		return time;
	}

	public void setTime(String time) {
		this.time = time;
	}

	public String getUserid() {
		return userid;
	}

	public void setUserid(String userid) {
		this.userid = userid;
	}

	public Integer getAppid() {
		return appid;
	}

	public void setAppid(Integer appid) {
		this.appid = appid;
	}

	public String getClient_ip() {
		return client_ip;
	}

	public void setClient_ip(String client_ip) {
		this.client_ip = client_ip;
	}

	@Override
		public String toString() {
			return  serverid+"|" + userid+"|" +appid+"|"+time+"|"+client_ip;
		}

}
package com.ks.model;

public class UserModel {

	private  UserLog  data;

	private String  type="" ;

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public  UserModel(){
	}

	public UserLog getData() {
		return data;
	}

	public void setData(UserLog data) {
		this.data = data;
	}

	@Override
	public String toString() {

		return data.toString()+"|"+type;
	}

}
时间: 2024-10-17 05:37:46

storm准实时应用的相关文章

基于OGG的Oracle与Hadoop集群准实时同步介绍

Oracle里存储的结构化数据导出到Hadoop体系做离线计算是一种常见数据处置手段.近期有场景需要做Oracle到Hadoop体系的实时导入,这里以此案例做以介绍.Oracle作为商业化的数据库解决方案,自发性的获取数据库事务日志等比较困难,故选择官方提供的同步工具OGG(Oracle GoldenGate)来解决. 安装与基本配置 环境说明 软件配置 角色 数据存储服务及版本 OGG版本 IP 源服务器 OracleRelease11.2.0.1 Oracle GoldenGate 11.2

使用Storm实现实时大数据分析

摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战.Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析.CSDN在此编译.整理. 简单和明了,Storm让大数据分析变得轻松加愉快. 当今世界,公司的日常运营经常会生成TB级别的数据.数据来源囊括了互联网装置可以捕获的任何类型数据,网站.社交媒体.交易型商业数据以及其它商业环境中创建的数据.考虑到数据的生成量,实时处理成为了许多机

利用Flume将MySQL表数据准实时抽取到HDFS

转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性.Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题.就像实验中所做的,每天定

binlog异步准实时刷出 平民架构

binlog异步准实时刷出 平民架构 效果很好,binlog异步准实时刷出,性能和默认的10相比没有下降,Binlog数据保护等级提高到准实时级别,这下彻底搞定数据保护了.//@平民架构: 继续修改代码,现在第二种情况压到2000了,只有在binlog切换时会有较大的下降.//@平民架构: 第二种情况,可以从768提升到1000的样子. ◆◆ @平民架构 增加事务是否等待Binlog刷盘的信息,只有双一和LGWR补丁达到了客户端事务等待Binlog刷盘后才继续. 继续修改代码,现在第二种情况压到

Storm分布式实时流计算框架相关技术总结

Storm分布式实时流计算框架相关技术总结 Storm作为一个开源的分布式实时流计算框架,其内部实现使用了一些常用的技术,这里是对这些技术及其在Storm中作用的概括介绍.以此为基础,后续再深入了解Storm的内部实现细节. 1. Zookeeper集群 Zookeeper是一个针对大型分布式系统的可靠协调服务系统,其采用类似Unix文件系统树形层次结构的数据模型(如:/zoo/a,/zoo/b),节点内可存储少量数据(<1M,当节点存储大数据量时,实际应用中可能出现同步问题). Zookeep

基于Storm构建实时热力分布项目实战

详情请交流  QQ  709639943 01.基于Storm构建实时热力分布项目实战 02.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.漫谈spring cloud分布式服务架构 09.Java Spring Se

PL2121-基于Storm构建实时热力分布项目实战

新年伊始,学习要趁早,点滴记录,学习就是进步! 不要到处找了,抓紧提升自己. 对于学习有困难不知道如何提升自己可以加扣:1225462853 获取资料. 下载地址:https://pan.baidu.com/s/1o9rZpj0 基于Storm构建实时热力分布项目实战 Storm是实时流处理领域的一柄利器,本课程采用最新的Storm版本1.1.0,从0开始由浅入深系统讲解,深入Storm内部机制,掌握Storm整合周边大数据框架的使用,从容应对大数据实时流处理! 谢谢大家的支持,我会努力给大家分

大数据Storm开发实时数据分析平台视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

狼厂项目实践:通用检索框架准实时流的设计与实现

背景 检索对实时性的要求很高,不仅是对索引建立.结果召回.策略干扰等核心部分,也包括数据录入的部分.检索的数据流主要包括全量数据与增量数据,其中全量数据是在运行前就已经生成好的,在检索进程运行开始时就直接解析加载了,后面不会再产生,所以不会对录入有高实时性的需求:而增量数据理论上在整个检索进程运行过程中随时都可能新增,新增了就需要录入.所以,提高增量数据录入的实时性,对提升整个检索的性能有重要作用. 设计思路与折衷 目前搜索引擎实现增量更新的方案主要有这几种:(1)提供写接口,(2)使用文件,(