PV(page views): count (session_id)
多线程下,注意线程安全问题
一、PV统计
方案分析
如下是否可行?
1、定义static long pv, Synchronized 控制累计操作
Synchronized 和 Lock在单JVM下有效,但在多JVM下无效
可行的两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
线程安全:多线程处理的结果和单线程一致
汇总型方案:
1、shuffleGrouping下,pv(单线程结果) * Executer并发数
一个Executer默认一个task,如果设置Task数大于1,公式应该是:
pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同
优点:简单、计算量小
缺点:稍有误差,但绝大多数场景能接受
优化:
案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,
利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
优点:1、绝对准确; 2、如果用fieldGrouping可以得到中间值,如单个user的访问PV(访问深度,也是有用指标)
缺点:计算量稍大,且多一个Bolt
Spout:
package base; import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; @Override public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 100; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
Bolt:
package com.storm.visits; import java.net.InetAddress; import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; /** * shuffleGrouping下,pv(单线程结果) * Executer并发数 一个Executer默认一个task,如果设置Task数大于1,公式应该是: pv(单线程结果) * Task 数 , 一个execute下可以有多个task 同一个Executer下task的线程ID相同,taskId不同 利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次 * */ public class PVBolt implements IRichBolt{ private static final long serialVersionUID = 1L; private OutputCollector collector; public static final String zk_path = "/lock/storm/pv"; ZooKeeper keeper = null; String lockData = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { keeper =new ZooKeeper("hadoop:2181",3000,new Watcher(){ @Override public void process(WatchedEvent event) { System.err.println("event : "+event.getType()); } }); //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上 while (keeper.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(1000); } InetAddress address = InetAddress.getLocalHost(); lockData = address.getHostAddress()+":"+context.getThisTaskId(); //其他线程发现该目录已经存在,就保证了唯一 if (keeper.exists(zk_path, false) == null) { //创建临时目录 keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } catch (Exception e) { try { keeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } String logString =null; String session_id = null; long pv =0; long beginTime = System.currentTimeMillis(); long endTime = 0; @Override public void execute(Tuple input) { try { logString = input.getString(0); endTime = System.currentTimeMillis(); if(logString != null){ session_id = logString.split("\t")[1]; if(session_id != null){ pv++; } } //利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次 if (endTime - beginTime >= 5*1000 ) { //判断是否相等,保证只有一个task能匹配 if (lockData.equals(keeper.getData(zk_path, false, null))) { //shuffleGrouping下,pv * Executer并发数 System.err.println(Thread.currentThread().getName()+ " pv = "+pv*4); } beginTime = System.currentTimeMillis(); } collector.ack(input); } catch (Exception e) { collector.fail(input); e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("")); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
package com.storm.visits; import java.util.HashMap; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import base.SourceSpout; /** * * TopologyBuilder * */ public class PvToPo { public static void main(String [] args) throws Exception{ TopologyBuilder builder =new TopologyBuilder(); //消息队列发射的tuple不会重复 builder.setSpout("spout", new SourceSpout(),1); builder.setBolt("bolt", new PVBolt(),4).shuffleGrouping("spout"); //设置参数 Map conf = new HashMap(); if (args.length > 0) { //分布式提交 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }else{ //本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
Kill 作业:
storm kill PvTopo
提交topo:
storm jar ./starter.jar visits.PvTopo PvTopo
时间: 2024-10-07 06:32:27