Strom之网站UV,PV统计

SPOUT:

package base;

import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SourceSpout implements IRichSpout{

    /**
     * 数据源Spout
     */
    private static final long serialVersionUID = 1L;

    Queue<String> queue = new ConcurrentLinkedQueue<String>();

    SpoutOutputCollector collector = null;

    String str = null;

    @Override
    public void nextTuple() {
        if (queue.size() >= 0) {
            collector.emit(new Values(queue.poll()));
        }
        try {
            Thread.sleep(500) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.collector = collector;

            Random random = new Random();
            String[] hosts = { "www.taobao.com" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                    "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

            for (int i = 0; i < 100; i++) {
                queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout ack:"+msgId.toString());
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout fail:"+msgId.toString());
    }

}

BOLT:

格式化数据源

package com.storm.user_visit;

import java.util.Map;

import tools.DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class FmtLogBolt implements IBasicBolt{

    private static final long serialVersionUID = 1L;

    String logString = null;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {

        logString = input.getString(0);
        if (logString !=null && logString.length() > 0) {

            String[] split = logString.split("\t");
            collector.emit(new Values(DateFmt.getCountDate(split[2], DateFmt.date_short),split[1]));//日期 sesion_id
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date","session_id"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    }

    @Override
    public void cleanup() {
    }

}

统计每个用户的深度

package com.storm.user_visit;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DeepVisitBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;

    Map<String,Long> counts = new HashMap<String, Long>();

    String session_id = null;
    String datestr = null;
    long pv =0;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {

        String datestr = input.getString(0);
        String session_id = input.getString(1);

        Long count =  counts.get(datestr+"_"+session_id);

        if(count == null){
            count = 0L;
        }

        count++;
        counts.put(datestr+"_"+session_id, count);
        collector.emit(new Values(datestr+"_"+session_id, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date_session","count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {

    }

    @Override
    public void cleanup() {
    }
}

使用Zookeeper线程锁,每5秒输出一次,同时统计PV和UV

package com.storm.user_visit;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

import tools.DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class UVSumBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;

    Map<String, Integer> counts = new HashMap<String, Integer>();

    String cur_date = null;
    long beginTime = System.currentTimeMillis();

    long endTime  = 0;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {

        try {

            endTime = System.currentTimeMillis();

            long PV=0L;//总数个数
            long UV =0L;//去重总数个数

            String date_sessionid = input.getString(0);
            Integer count = input.getInteger(1);

            //判断数据是否是当天的数据,如果时间是今天的数据或者是明天的
            if (! date_sessionid.startsWith(cur_date) && DateFmt.parseDate(date_sessionid.split("_")[0]).after(DateFmt.parseDate(cur_date))) {
                //第二天的数据,跨天,有新的数据了
                cur_date = date_sessionid.split("_")[0];
                counts.clear();
            }

            //可能会出现旧的数据
            counts.put(date_sessionid, count);

             if (endTime - beginTime >= 5*1000 ) {

                //获取word去重的个数,遍历counts的keySet,取count
                Iterator<String> i2 = counts.keySet().iterator();
                while (i2.hasNext()) {

                    String key =i2.next();

                    if(key != null){

                        //只计算今天的数据
                        if (key.startsWith(cur_date)) {
                            UV ++;
                            PV += counts.get(key);
                        }
                    }
                }

                //判断是否相等,保证只有一个task能匹配
                 if (lockData.equals(new String(keeper.getData(zk_path, false, null)))) {

                     System.err.println("  PV  =  "+PV +" UV   = "+UV);
                 }
             }
             beginTime = System.currentTimeMillis();

        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public static final String zk_path = "/lock/storm/pv";
    ZooKeeper keeper  = null;
    String lockData = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short);

        try {
                keeper =new ZooKeeper("hadoop:2181", 3000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.err.println("even = "+event.getType());
                    }
                });

             //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上
            while (keeper.getState() != ZooKeeper.States.CONNECTED) {
                System.out.println("connect failed");
                Thread.sleep(1000);
            }

            InetAddress address = InetAddress.getLocalHost();
            lockData = address.getHostAddress()+":"+context.getThisTaskId();

            //其他线程发现该目录已经存在,就保证了唯一
            if (keeper.exists(zk_path, false) == null) {
                //创建临时目录
                keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }

        } catch (Exception e) {

            try {
                keeper.close();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }

    }
    @Override
    public void cleanup() {

    }

}

ToPo:

package com.storm.user_visit;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import base.SourceSpout;

public class UVToPo {

    public static void main(String [] args) throws Exception{

        TopologyBuilder builder =new TopologyBuilder();

        builder.setSpout("spout", new SourceSpout(),1);
        builder.setBolt("Fmtbolt", new FmtLogBolt(),4).shuffleGrouping("spout");
        builder.setBolt("deepbolt", new DeepVisitBolt(),4).fieldsGrouping("Fmtbolt", new Fields("date","session_id"));
        builder.setBolt("UvSum", new UVSumBolt(),1).shuffleGrouping("deepbolt");

        //设置参数
        Config conf = new Config();

        if (args.length > 0) {
            //分布式提交
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }else{
            //本地模式提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }

    }
}
时间: 2024-12-05 12:26:53

Strom之网站UV,PV统计的相关文章

NGINX: 统计网站的PV、UV、独立IP

Nginx: PV.UV.独立IP 做网站的都知道,平常经常要查询下网站PV.UV等网站的访问数据,当然如果网站做了CDN的话,nginx本地的日志就没什么意义了,下面就对nginx网站的日志访问数据做下统计: 概念: UV(Unique Visitor):独立访客,将每个独立上网电脑(以cookie为依据)视为一位访客,一天之内(00:00-24:00),访问您网站的访客数量.一天之内相同cookie的访问只被计算1次 PV(Page View):访问量,即页面浏览量或者点击量,用户每次对网站

网站流量PV,UV,IP的含义

文章来源:http://lzj0470.iteye.com/blog/647453 1.什么是pv PV(page view),即页面浏览量,或点击量;通常是衡量一个网络新闻频道或网站甚至一条网络新闻的主要指标. 高手对pv的解释是,一个访问者在24小时(0点到24点)内到底看了你网站几个页面.这里需要强调:同一个人浏览你网站同一个页面,不重复计算pv量,点100次也算1次.说白了,pv就是一个访问者打开了你的几个页面. PV之于网站,就像收视率之于电视,从某种程度上已成为投资者衡量商业网站表现

网站UV,与IP、PV

什么是网站UV,与IP.PV在概念上的区别? UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客.00:00-24:00内相同的客户端只被计算一次. PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次. IP(独立IP):指独立IP数.00:00-24:00内相同IP地址只被计算一次.

Storm之网站PV统计利用Zookeeper锁控制线程操作

PV(page views): count (session_id) 多线程下,注意线程安全问题 一.PV统计 方案分析 如下是否可行? 1.定义static long pv, Synchronized 控制累计操作 Synchronized 和 Lock在单JVM下有效,但在多JVM下无效 可行的两个方案: 1.shuffleGrouping下,pv * Executer并发数 2.bolt1进行多并发局部汇总,bolt2单线程进行全局汇总 线程安全:多线程处理的结果和单线程一致 汇总型方案:

如何实现ASP.NET中网站访问量的统计

如何实现ASP.NET中网站访问量的统计 2009-07-30 15:50 佚名 网翼教程网 字号:T | T 本文介绍了如何在asp.net中进行网站访问量的统计. AD:51CTO 网+ 第十二期沙龙:大话数据之美_如何用数据驱动用户体验 下面介绍如何进行ASP.NET中网站访问量的统计. 一.建立一个数据表IPStat用于存放用户信息 我在IPStat表中存放的用户信息只包括登录用户的IP(IP_Address),IP来源(IP_Src)和登录时间(IP_DateTime),些表的信息本人

网站访问量实时统计

一.需求:统计网站访问量(实时统计) 技术选型:特点(数据量大.做计算.实时) 实时流式计算框架:storm 1)spout 数据源,接入数据源 本地文件 2)splitbolt 业务逻辑处理 切分数据 拿到网址 3)bolt 累加次数求和 1.PvCountSpout类 package com.demo.pvcount; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFo

HyperLoglog算法在Uv实时统计中的应用

1 传统的Uv实时统计方法以及其缺点 给定时间段条件下,实时统计Uv就是统计不重复的访客数. 最简单的方法就是把用户唯一id存储到集合中,每次有新访客,就把向集合新增元素. 但是当数据量千万级别的时候,无论是内存中,还是redis等外部系统中,集合新增元素的效率都很低. 2 HyperLoglog 在不追求绝对准确的情况下,使用概率算法算是一个不错的解决方案. 概率算法不直接存储数据集合本身,通过一定的概率统计方法预估基数值,这种方法可以大大节省内存. 怎么理解HyperLoglog算法呢, 下

Zabbix企业级监控之监控网站的PV和UV

1.PV.UV是什么? UV:独立访客,每个独立上网电脑视为一位访客,一天之内网站的访客数量 PV:访问量,页面浏览量或者点击量,用户每访问一次记录一次 2.根据nginx的访问日志统计PV和UV UV根据访问IP去重得到 awk '{print $1}' access.log | sort | uniq -c | wc -l PV根据访问的URL来统计 awk '{print $7}' access.log|wc -l 3.在shell脚本中获取PV和UV vim /etc/zabbix/sh

网站计数PV UV IP Session 的简介

PV(Page View)访问量, 即页面浏览量或点击量,衡量网站用户访问的网页数量:在一定统计周期内用户每打开或刷新一个页面就记录1次,多次打开或刷新同一页面则浏览量累计.  UV(Unique Visitor)独立访客,统计1天内访问某站点的用户数(以cookie为依据);访问网站的一台电脑客户端为一个访客.可以理解成访问某网站的电脑的数量.网站判断来访电脑的身份是通过来访电脑的cookies实现的.如果更换了IP后但不清除cookies,再访问相同网站,该网站的统计中UV数是不变的.如果用