大数据学习之Storm实时统计网站访问量案例35

案例一:统计网站访问量(实时统计)

 

实时流式计算框架:storm

1)spout

数据源,接入数据源

本地文件如下

编写spout程序:

package pvcount;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**
 * @author Dawn
 * @date 2019年6月7日10:19:39
 * @version 1.0
 * 编写spout。接入本地数据源
 */
public class PvCountSpout implements IRichSpout{

	private SpoutOutputCollector collector;
	private BufferedReader br;
	private String line;

	@Override
	public void nextTuple() {
		//发送读取的数据的每一行
		try {
			while((line=br.readLine()) != null) {
				//发送数据到splitbolt
				collector.emit(new Values(line));
				//设置延迟
				Thread.sleep(500);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		this.collector=collector;

		//读取文件
		try {
			br=new BufferedReader(new InputStreamReader(new FileInputStream("f:/temp/storm实时统计访问量/weblog.log")));
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//别关流!!!!
//		finally {
//			if(br!=null) {
//				try {
//					br.close();
//				} catch (IOException e) {
//					// TODO Auto-generated catch block
//					e.printStackTrace();
//				}
//			}
//		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//声明
		declarer.declare(new Fields("logs"));
	}

	//处理tuple成功 回调的方法。就像kafka的那个callback回调函数,还有zookeeper中的回调函数 process
	@Override
	public void ack(Object arg0) {
		// TODO Auto-generated method stub

	}

	//如果spout在失效的模式中 调用此方法来激活,和在Linux中那个命令 storm activate [拓扑名称] 一样的效果
	@Override
	public void activate() {
		// TODO Auto-generated method stub

	}

	//在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行
	@Override
	public void close() {
		// TODO Auto-generated method stub

	}

	//在spout失效期间,nextTuple不会被调用 和在Linux中那个命令 storm deactivate [拓扑名称] 一样的效果
	@Override
	public void deactivate() {
		// TODO Auto-generated method stub

	}

	//处理tuple失败回调的方法
	@Override
	public void fail(Object arg0) {
		// TODO Auto-generated method stub

	}

	//配置
	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}

  

2)splitbolt

业务逻辑处理

切分数据

拿到网址

package pvcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @author Dawn
 * @date 2019年6月7日10:30:38
 * @version 1.0
 * 切分数据,拿到网址
 */
public class PvCountSplitBolt implements IRichBolt{

	private  OutputCollector collector;
	private int pvnum = 0;

	//业务逻辑  分布式 集群 并发度 线程(接收tuple然后进行处理)
	@Override
	public void execute(Tuple input) {
		//1.获取数据
		String line = input.getStringByField("logs");

		//2.切分数据
		String[] fields = line.split("\t");
		String session_id=fields[1];

		//3.局部累加
		if(session_id != null) {
			pvnum++;
			//输出
			collector.emit(new Values(Thread.currentThread().getId(),pvnum));
		}
	}

	//初始化调用
	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this.collector=collector;
	}

	//声明
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//声明输出
		declarer.declare(new Fields("threadid","pvnum"));
	}

	//一个bolt即将关闭时调用 不能保证一定被调用 资源清理
	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	//配置
	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}

  

3)bolt

累加次数求和

package pvcount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

/**
 * @author Dawn
 * @date 2019年6月7日10:39:52
 * @version 1.0
 * 累加次数求和
 */
public class PvCountSumBolt implements IRichBolt{

	private OutputCollector collector;
	private HashMap<Long, Integer> hashmap=new HashMap<>();

	@Override
	public void cleanup() {

	}

	@Override
	public void execute(Tuple input) {
		//1.获取数据
		Long threadId = input.getLongByField("threadid");
		Integer pvnum = input.getIntegerByField("pvnum");

		//2.创建集合 存储 (threadid,pvnum)
		hashmap.put(threadId, pvnum);

		//3.累加求和(拿到集合中所有value值)
		Iterator<Integer> iterator = hashmap.values().iterator();

		//4.清空之前的数据
		int sum=0;
		while(iterator.hasNext()) {
			sum+=iterator.next();
		}

		System.err.println(Thread.currentThread().getName() + "总访问量为->" + sum);
	}

	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}

  

4)Driver

使用字段分组

package pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author Dawn
 * @date 2019年6月7日10:45:53
 * @version 1.0 统计网站访问量(实时统计)
 */
public class PvCountDriver {
	public static void main(String[] args) {
		// 1.创建拓扑
		TopologyBuilder builder = new TopologyBuilder();

		// 2.指定设置
		builder.setSpout("pvcountspout", new PvCountSpout(), 1);
		builder.setBolt("pvsplitbolt", new PvCountSplitBolt(), 6).setNumTasks(4).fieldsGrouping("pvcountspout",
				new Fields("logs"));
		builder.setBolt("pvcountbolt", new PvCountSumBolt(), 1).fieldsGrouping("pvsplitbolt",
				new Fields("threadid", "pvnum"));

		// 3.创建配置信息
		Config conf = new Config();
		conf.setNumWorkers(2);

		// 4.提交任务
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
	}

}

  

运行结果如下:

总共190条数据。统计完成之后再进行添加数据。程序会继续统计

原文地址:https://www.cnblogs.com/hidamowang/p/10987864.html

时间: 2024-08-12 04:33:49

大数据学习之Storm实时统计网站访问量案例35的相关文章

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

大数据学习之Storm实时计算概述及安装部署33

一:Storm概述 网址:http://storm.apache.org/ ApacheStorm是一个免费的开源分布式实时计算系统.Storm可以轻松可靠地处理无限数据流,实现Hadoop对批处理所做的实时处理.Storm非常简单,可以与任何编程语言一起使用,并且使用起来很有趣! Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等.风暴很快:一个基准测试表示每个节点每秒处理超过一百万个元组.它具有可扩展性,容错性,可确保您的数据得到处理,并且易于设置和操作. Sto

Spark 2.x企业级大数据项目实战(实时统计、离线分析和实时ETL)

Spark 2.x企业级大数据项目实战(实时统计.离线分析和实时ETL)全套课程下载:https://pan.baidu.com/s/1mje6bAoLLPrxUIrM-C2VMg 提取码: 9n1x 本门课程来源于一线生产项目, 所有代码都是在现网大数据集群上稳定运行, 拒绝Demo.课程涵盖了离线分析.实时分析绝大部分的场景,通过三个实际生产项目教授如何优雅地集成Hadoop.Spark.HBase.Kafka.Redis.MySQL等相关大数据技术,并实际落地 . 本门课程全程实操,不用担

大数据学习——SparkStreaming整合Kafka完成网站点击流实时统计

1.安装并配置zk 2.安装并配置Kafka 3.启动zk 4.启动Kafka 5.创建topic [[email protected] kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic cyf-test 程序代码 package org.apache.spark import java.net.InetSocketAddress import org.apache.spark.HashPartition

大数据学习方向,从入门到精通

推荐一个大数据学习群 119599574晚上20:10都有一节[免费的]大数据直播课程,专注大数据分析方法,大数据编程,大数据仓库,大数据案例,人工智能,数据挖掘都是纯干货分享,你愿意来学习吗 很多初学者在萌生向大数据方向发展的想法之后,不免产生一些疑问,应该怎样入门?应该学习哪些技术?学习路线又是什么? 所有萌生入行的想法与想要学习Java的同学的初衷是一样的.岗位非常火,就业薪资比较高,,前景非常可观.基本都是这个原因而向往大数据,但是对大数据却不甚了解. 如果你想学习,那么首先你需要学会编

大数据学习路线图 让你精准掌握大数据技术学习?

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 大数据学习QQ群:119599574 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java

大数据学习之小白如何学大数据?(详细篇)

大数据这个话题热度一直高居不下,不仅是国家政策的扶持,也是科技顺应时代的发展.想要学习大数据,我们该怎么做呢?大数据学习路线是什么?先带大家了解一下大数据的特征以及发展方向. 大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘. 先说一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等; 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来; 处理时效性高,海量数据的处

大数据学习路线指导,告诉你如何学习大数据

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图, ? ? 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java字符串.Java数组与类和对象.数字处理类与核心技术.I/O

大数据学习路线 让你精准掌握大数据技术学习

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java字符串.Java数组与类和对象.数字处