Storm之网站PV统计利用Zookeeper锁控制线程操作

PV(page views): count (session_id)

多线程下,注意线程安全问题

一、PV统计

方案分析

如下是否可行?

1、定义static long pv, Synchronized 控制累计操作

Synchronized 和 Lock在单JVM下有效,但在多JVM下无效

可行的两个方案:

1、shuffleGrouping下,pv * Executer并发数

2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

线程安全:多线程处理的结果和单线程一致

汇总型方案:

1、shuffleGrouping下,pv(单线程结果) * Executer并发数

一个Executer默认一个task,如果设置Task数大于1,公式应该是:

pv(单线程结果) * Task 数 ,

同一个Executer下task的线程ID相同,taskId不同

优点:简单、计算量小

缺点:稍有误差,但绝大多数场景能接受

优化:

案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,

利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次

2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

优点:1、绝对准确; 2、如果用fieldGrouping可以得到中间值,如单个user的访问PV(访问深度,也是有用指标)

缺点:计算量稍大,且多一个Bolt

Spout:

package base;

import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SourceSpout implements IRichSpout{

    /**
     * 数据源Spout
     */
    private static final long serialVersionUID = 1L;

    Queue<String> queue = new ConcurrentLinkedQueue<String>();

    SpoutOutputCollector collector = null;

    String str = null;

    @Override
    public void nextTuple() {
        if (queue.size() >= 0) {
            collector.emit(new Values(queue.poll()));
        }
        try {
            Thread.sleep(500) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.collector = collector;

            Random random = new Random();
            String[] hosts = { "www.taobao.com" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                    "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

            for (int i = 0; i < 100; i++) {
                queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout ack:"+msgId.toString());
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout fail:"+msgId.toString());
    }

}

Bolt:

package com.storm.visits;

import java.net.InetAddress;
import java.util.Map;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/**
 * shuffleGrouping下,pv(单线程结果) * Executer并发数
    一个Executer默认一个task,如果设置Task数大于1,公式应该是:
    pv(单线程结果) * Task 数 ,

    一个execute下可以有多个task
    同一个Executer下task的线程ID相同,taskId不同

    利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
 *
 */
public class PVBolt  implements IRichBolt{

    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    public static final String zk_path = "/lock/storm/pv";
    ZooKeeper keeper  = null;
    String lockData = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;

        try {
             keeper =new ZooKeeper("hadoop:2181",3000,new Watcher(){
                @Override
                public void process(WatchedEvent event) {
                    System.err.println("event : "+event.getType());
                }
            });

             //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上
            while (keeper.getState() != ZooKeeper.States.CONNECTED) {
                Thread.sleep(1000);
            }

            InetAddress address = InetAddress.getLocalHost();
            lockData = address.getHostAddress()+":"+context.getThisTaskId();

            //其他线程发现该目录已经存在,就保证了唯一
            if (keeper.exists(zk_path, false) == null) {
                //创建临时目录
                keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            }

        } catch (Exception e) {
            try {

                keeper.close();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    String logString =null;
    String session_id = null;
    long pv =0;

    long beginTime = System.currentTimeMillis();
    long endTime  = 0;
    @Override
    public void execute(Tuple input) {

        try {
            logString  = input.getString(0);
            endTime = System.currentTimeMillis();

            if(logString != null){
                session_id = logString.split("\t")[1];
                 if(session_id != null){
                     pv++;
                 }
            }

             //利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
             if (endTime - beginTime >= 5*1000 ) {

                 //判断是否相等,保证只有一个task能匹配
                 if (lockData.equals(keeper.getData(zk_path, false, null))) {

                     //shuffleGrouping下,pv * Executer并发数
                     System.err.println(Thread.currentThread().getName()+ " pv = "+pv*4);
                 }
                 beginTime = System.currentTimeMillis();
             }

             collector.ack(input);

        } catch (Exception e) {
            collector.fail(input);
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(""));
    }

    @Override
    public void cleanup() {}

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
package com.storm.visits;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import base.SourceSpout;
/**
 *
 * TopologyBuilder
 *
 */
public class PvToPo {

	public static void main(String [] args) throws Exception{

		TopologyBuilder builder =new TopologyBuilder();

		//消息队列发射的tuple不会重复
		builder.setSpout("spout", new SourceSpout(),1);
		builder.setBolt("bolt", new PVBolt(),4).shuffleGrouping("spout");

		//设置参数
		Map conf = new HashMap();

		if (args.length > 0) {
			//分布式提交
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}else{
			//本地模式提交
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}

	}
}

  

Kill 作业:

storm kill PvTopo

提交topo:

storm jar ./starter.jar visits.PvTopo PvTopo

时间: 2024-10-07 06:32:27

Storm之网站PV统计利用Zookeeper锁控制线程操作的相关文章

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

网站流量统计之PV和UV

转自:http://blog.csdn.NET/webdesman/article/details/4062069 如果您是一个站长,或是一个SEO,您一定对于网站统计系统不会陌生,对于SEO新手来说,统计系统中的一些概念不是很清楚,今天讲讲什么是PV和UV! 网站流量统计之UV(Unique Visitor):独立访客,将每个独立上网电脑(以cookie为依据)视为一位访客,一天之内(00:00-24:00),访问您网站的访客数量.一天之内相同cookie的访问只被计算1次. 网站流量统计之P

网站日志统计案例分析与实现

1.概要 到这一步,若是按照前面到文章一步走来,不出意外,我想hadoop平台环境应该搭建OK了.下面我以自己工作中实际的案例来梳理一下整个流程.同时参考一些其他的文章来分析,由于很多网站的日志KPI都大同小异,故有些指标直接在文中赘述了. 2.流程 背景 前言 目录 日志分析概述 需求分析 源码 2.1 背景 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘.开源界及厂商,所有数据软件,无一不向Hadoop靠拢.Hadoop也从小规模的

Hadoop.2.x_网站PV示例

一.网站基本指标(即针对于网站用户行为而产生的日志中进行统计分析) 1. PV:网页浏览量(Page View页面浏览次数,只要进入该网页就产生一条记录,不限IP,统计点每天(较多)/每周/每月/..) 2. UV:独立访客数(Unique Vistor,以Cookie为依据,同一天内一个用户多次访问,只记为一个) 3. VV:访客的访问次数(Visit View,以Session为依据,访客访问网站到关掉该网站所有页面即记为一次访问) 4. IP:独立IP数(即记录不同IP,同一IP访问多次算

开源网站访问统计系统Piwik

http://www.piwik.cn/ http://www.piwik.org/ Piwik 是一套基于 Php+MySQL 技术构建,能够与 Google Analytics 相媲美的开源网站访问统计系统.Piwik 可以给你详细的统计信息,比如网页浏览人数, 访问最多的页面, 搜索引擎关键词等等,并且采用了大量的AJAX/Flash技术,使得在操作上更加便易. Piwik 可以安装在你的服务器上面,数据就保存在你自己的服务器上 面.你可以非常容易的插入统计图表到你的博客或是网站后台的控制

javascript实现的网站访问量统计代码实例

javascript实现的网站访问量统计代码实例: 作为一个网站的管理员或者说站长,都希望知道到底有多少人访问了网站,这个时候就需要有一个统计功能来满足需要,当然功能比较单一和简单,如果想要强大的统计效果,那最好还是使用现在比较成熟的功统计工具,比如站长统计或者腾讯统计等等. 代码如下: <script type="text/javascript"> var caution=false function setCookie(name,value,expires,path,d

网站流量统计(免费的)

我们在网站上经常看到这个图标如下: 这个就是统计当前网站的访问量的,可以查看访问的IP,访问的次数 , 1.CNZZ数据专家网站首页 全球最大的网站统计分析平台 http://www.cnzz.com/ 2.免费注册 3.获取统计代码 4.将统计代码直接粘贴到开发的代码中 一把在底部显示,这段代码放到JSP页面上就可以了 5.查看密码 在网站上,点击统计的图标,进入查看密码页面,需要输入密码,就可以看到本网站的统计量. 设置查看密码: 上述第二步中,有个查看密码,点击进行设置,就可以了. 网站流

刷流量,免费手机在线刷网站流量,刷网站PV,刷博客流量,刷博客访问量

刷流量,免费手机在线刷网站流量,刷网站PV,刷博客(淘宝)流量,刷博客(淘宝)访问量,用手机浏览器或者微信扫以下二维码: 有图有真相:还怕网站每天流量极低的站长们,还有网店的店主们,动动你们的手指,打开手机浏览器或微信扫扫二维码:你会惊讶的看到,手机也能刷网站(网店)流量,网站PV哦!    网站来源:http://www.learnphp.cn

javascript实现的网站访问量统计代码

javascript实现的网站访问量统计代码:网站一般都有访问量统计工具,比较高效实用的工具多种多样,并且非常的精确.实用javascript也可以简单的实现此功能,尽管没有网络上常用的精确,不过的确也实现了一定的功能,下面就是一段代码实例,感兴趣的朋友可以参考一下: <script type="text/javascript"> /** * vlstat 浏览器统计脚本 */ var statIdName = "vlstatId"; var xmlHt