网站访问量实时统计

一、需求:统计网站访问量(实时统计)

技术选型:特点(数据量大、做计算、实时)

实时流式计算框架:storm

1)spout
数据源,接入数据源
本地文件

2)splitbolt
业务逻辑处理
切分数据
拿到网址

3)bolt
累加次数求和

1、PvCountSpout类

package com.demo.pvcount;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class PvCountSpout implements IRichSpout{

    private SpoutOutputCollector collector;
    private BufferedReader br;
    private String line;

    @Override
    public void nextTuple() {
        //发送读取的数据的每一行
        try {
            while((line = br.readLine())!= null) {
                //发送数据到splitbolt
                collector.emit(new Values(line));
                //设置延迟
                Thread.sleep(500);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        this.collector = collector;

        //读取文件
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //声明
        declarer.declare(new Fields("logs"));
    }

    //处理tuple成功 回调的方法
    @Override
    public void ack(Object arg0) {
    }

    //如果spout在失效的模式中 调用此方法来激活
    @Override
    public void activate() {
    }

    //在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行
    @Override
    public void close() {
    }

    //在spout失效期间,nextTuple不会被调用
    @Override
    public void deactivate() {
    }

    //处理tuple失败回调的方法
    @Override
    public void fail(Object arg0) {
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

2、PvCountSplitBolt类

package com.demo.pvcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class PvCountSplitBolt implements IRichBolt{

    private OutputCollector collector;

    //一个bolt即将关闭时调用 不能保证一定被调用 资源清理
    @Override
    public void cleanup() {
    }

    private int pvnum = 0;
    //业务逻辑 分布式 集群 并发度 线程 (接收tuple然后进行处理)
    @Override
    public void execute(Tuple input) {
        //1.获取数据
        String line = input.getStringByField("logs");

        //2.切分数据
        String[] fields = line.split("\t");
        String session_id = fields[1];

        //3.局部累加
        if (session_id != null) {
            //累加
            pvnum++;
            //输出
            collector.emit(new Values(Thread.currentThread().getId(),pvnum));
        }
    }

    //初始化调用
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    //声明
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //声明输出
        declarer.declare(new Fields("threadid","pvnum"));
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

3、PvCountSumBolt类

package com.demo.pvcount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PvCountSumBolt implements IRichBolt{

    private HashMap<Long, Integer> hashMap = new HashMap<>();

    @Override
    public void cleanup() {
    }

    //全局累加求和 业务逻辑
    @Override
    public void execute(Tuple input) {
        //1.获取数据
        Long threadid = input.getLongByField("threadid");
        Integer pvnum = input.getIntegerByField("pvnum");

        //2.创建集合 存储(threadid,pvnum) 15 20
        hashMap.put(threadid, pvnum);

        //3.累加求和(拿到集合中所有value值)
        Iterator<Integer> iterator = hashMap.values().iterator();

        //4.清空之前的数据
        int sumnum = 0;
        while (iterator.hasNext()) {
            sumnum += iterator.next();
        }

        System.err.println(Thread.currentThread().getName() + "总访问量为->" + sumnum);
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

4、PvCountDriver类

package com.demo.pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class PvCountDriver {
    public static void main(String[] args) {
        // 1.hadoop->Job storm->topology 创建拓扑
        TopologyBuilder builder = new TopologyBuilder();

        // 2.指定设置
        builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
        builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                .fieldsGrouping("PvCountSpout", new Fields("logs"));
        builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));

        // 3.创建配置信息
        Config conf = new Config();
        conf.setNumWorkers(2);

        // 4.提交任务
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
    }
}

5、PvCountDriver_Shuffle类

package com.demo.pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class PvCountDriver_Shuffle {
    public static void main(String[] args) {
        // 1.hadoop->Job storm->topology 创建拓扑
        TopologyBuilder builder = new TopologyBuilder();

        // 2.指定设置
        builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
        builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                .shuffleGrouping("PvCountSpout");
        builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt");

        // 3.创建配置信息
        Config conf = new Config();
        conf.setNumWorkers(2);

        // 4.提交任务
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
    }
}

6、weblog.log文件

storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 10:40:49
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:50
storm.apache.org    BBYH61456DEL89RG5VV9UYU7    2018-08-07 10:40:49
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 09:40:49
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 10:40:49
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 12:40:49
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:52
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 08:40:50
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 09:40:49.........
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 08:40:53
storm.apache.org    BBYH61456DEL89RG5VV9UYU7    2018-08-07 12:40:49
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 08:40:51
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 10:40:49
storm.apache.org    HUNTERH6YCGFJYERTT834R52FDXV9U34    2018-08-07 08:40:53
storm.apache.org    BBYH61456DEL89RG5VV9UYU7    2018-08-07 08:40:50
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 08:40:53
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 10:40:49

7、运行(4)中的main方法,控制台显示如下图:

此时在weblog.log文件中增加几条数据,则总访问量相应增加几条。

至此,简单实现了网站访问量实时统计。

原文地址:https://www.cnblogs.com/areyouready/p/10188300.html

时间: 2024-08-28 11:58:40

网站访问量实时统计的相关文章

如何实现ASP.NET中网站访问量的统计

如何实现ASP.NET中网站访问量的统计 2009-07-30 15:50 佚名 网翼教程网 字号:T | T 本文介绍了如何在asp.net中进行网站访问量的统计. AD:51CTO 网+ 第十二期沙龙:大话数据之美_如何用数据驱动用户体验 下面介绍如何进行ASP.NET中网站访问量的统计. 一.建立一个数据表IPStat用于存放用户信息 我在IPStat表中存放的用户信息只包括登录用户的IP(IP_Address),IP来源(IP_Src)和登录时间(IP_DateTime),些表的信息本人

切分Nginx日志,完成网站访问量的自动统计

如果你的网站通过 Nginx 代理,那么本文将为你提供一个自动统计网站访问量的方案. 方案在实现步骤上,一个分为三步: 1. 运行 shell 脚本,移动 Nginx 日志到指定文件夹,并运行 Python 脚本: 2. 执行 Python 脚本,统计有效的 IP 访问量 3. 设置 crontab 定时任务. 一.shell 脚本 通过 Nginx 配置文件,查看监听端口的日志文件,并移动到指定的路径. 然后运行 Python 脚本,执行处理 Nginx 日志文件的 Python 脚本. sh

大数据学习之Storm实时统计网站访问量案例35

案例一:统计网站访问量(实时统计)   实时流式计算框架:storm 1)spout 数据源,接入数据源 本地文件如下 编写spout程序: package pvcount; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader;

网站访问量统计功能的实现

实现方法:拦截器+session存储 拦截器初始化时,即在@PostConstruct注解的initMethod方法中读取数据库的isystem对象,该对象记录了网站访问量的信息. 拦截器销毁时,即在@PreDestroy注解的destroyMethod方法中向数据库更新isystem对象. 拦截器的初始化和销毁都只有在应用启动和关闭的时候才被调用,因此减少了对数据库的访问. SpringMVC中,每一次请求控制器,都会先执行拦截器的preHandle方法,在该方法内先查看session中的一个

javascript实现的网站访问量统计代码实例

javascript实现的网站访问量统计代码实例: 作为一个网站的管理员或者说站长,都希望知道到底有多少人访问了网站,这个时候就需要有一个统计功能来满足需要,当然功能比较单一和简单,如果想要强大的统计效果,那最好还是使用现在比较成熟的功统计工具,比如站长统计或者腾讯统计等等. 代码如下: <script type="text/javascript"> var caution=false function setCookie(name,value,expires,path,d

javascript实现的网站访问量统计代码

javascript实现的网站访问量统计代码:网站一般都有访问量统计工具,比较高效实用的工具多种多样,并且非常的精确.实用javascript也可以简单的实现此功能,尽管没有网络上常用的精确,不过的确也实现了一定的功能,下面就是一段代码实例,感兴趣的朋友可以参考一下: <script type="text/javascript"> /** * vlstat 浏览器统计脚本 */ var statIdName = "vlstatId"; var xmlHt

ASP.net中网站访问量统计方法代码(在线人数,本月访问,本日访问,访问流量,累计访问)

一.建立一个数据表IPStat用于存放用户信息 我在IPStat表中存放的用户信息只包括登录用户的IP(IP_Address),IP来源(IP_Src)和登录时间 (IP_DateTime),些表的信息本人只保存一天的信息,如果要统计每个月的信息则要保存一个月.因为我不太懂对数据日志的操作,所以创建此表,所 以说我笨吧,哈哈. 二.在Global.asax中获取用户信息 在Global.asax的Session_Start即新会话启用时获取有关的信息,同时在这里实现在线人数.访问总人数的增量统计

github+hexo搭建自己的博客网站(四)主题之外的一些基本配置(统计配置,网站访问量显示)

1.百度.谷歌统计配置 百度统计配置 申请账号:https://tongji.baidu.com/web/welcome/login 在代码获取的地方只要填入key即可 注册的时候,填的域名和url,我选的都是https://saucxs.github.io/ 在yilia主题下文件里themes\yilia文件夹下的_config.yml)找到这个baidu_analytics # Miscellaneous baidu_analytics: 'ace6dXXXXXXXXXXfbc' goog

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1