Storm[TOPN -排序] - RollingCountBolt

阅读背景:

1 : 您需要对滑动窗口要初步了解

2  :   您需要了解滑动窗口在滑动的过程之中,滑动chunk的计算过程,尤其是每发射一次,就需要清空一次。

package com.cc.storm.bolt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 1 在这里我们需要去实现一个滑动窗口,请注意,在我们实现滑动窗口的过程之中清空的是当前滑动窗口的下一个
 * 
 * 
 * 
 * @author Yin Shuai
 * 
 */
public class RollingCountBolt implements IRichBolt {

	private static final long serialVersionUID = 1765379339552134320L;

	private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
	private int _numBuckets;
	private transient Thread cleaner;
	private OutputCollector _collector;

	/**
	 * _trackMinute
	 * 是我们整个滑动窗口的大小,滑动窗口的大小,本质上决定了我们的时间区间,也就是说,假设我们目前滑动窗口的总体大小为15分钟。
	 * 那我们的商品点击的实时排序的指标值,好比商品浏览量的计算值,也就是15分钟
	 * 
	 * 而单个窗口的大小也就是我,我们这个三十分钟在随着时间不断的在推移
	 * 
	 * 举例说明:在最初的构造过程之中,如果我们的桶的数目为10,那么单个窗口的时间长度为3.
	 * 
	 * [0,30],[3,33],[6,36],[9,39],[12,42] 统计的数值处在不断的变化之中
	 * 
	 */
	private int _trackMinutes;

	public RollingCountBolt(int numBuckets, int trackMinutes) {
		this._numBuckets = numBuckets;
		this._trackMinutes = trackMinutes;
	}

	public long totalObjects(Object obj) {
		long[] curr = _objectCounts.get(obj);
		long total = 0;
		for (long l : curr) {
			total += l;
		}
		return total;
	}

	public int currentBucket(int buckets) {
		return currentSecond() / secondsPerBucket(buckets) % buckets;
	}

	public int currentSecond() {
		return (int) (System.currentTimeMillis() / 1000);
	}

	/**
	 * 
	 * @param buckets
	 *            你设定的桶的数量
	 * @return 依据我们默认的_trackMinutes / buckets 得到每一个桶的数量
	 */
	public int secondsPerBucket(int buckets) {
		return _trackMinutes * 60 / buckets;
	}

	public long millisPerBucket(int buckets) {
		return (long) 1000 * secondsPerBucket(buckets);
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		// TODO Auto-generated method stub
		_collector = collector;
		cleaner = new Thread(new Runnable() {

			@SuppressWarnings("unchecked")
			@Override
			public void run() {
				// TODO Auto-generated method stub
				int lastBucket = currentBucket(_numBuckets);

				while (true) {

					int currBucket = currentBucket(_numBuckets);
					p("线程while循环: 当前的桶为:" + currBucket);

					if (currBucket != lastBucket) {
						p("线程while循环:之前的桶数为:" + lastBucket);

						int bucketToWipe = (currBucket + 1) % _numBuckets;
						p("线程while循环:要擦除掉的桶为:" + bucketToWipe);

						synchronized (_objectCounts) {
							Set objs = new HashSet(_objectCounts.keySet());

							for (Object obj : objs) {

								long[] counts = _objectCounts.get(obj);
								long currBucketVal = counts[bucketToWipe];
								p("线程while循环:擦除掉的值为:" + currBucketVal);
								counts[bucketToWipe] = 0;
								long total = totalObjects(obj);
								if (currBucketVal != 0) {
									p("线程while循环:擦除掉的值为不为0:那就发射数据:obj total"
											+ obj + ":" + total);
									_collector.emit(new Values(obj, total));

								}
								if (total == 0) {

									p("线程while循环: 总数为0以后,将obj对象删除");
									_objectCounts.remove(obj);

								}
							}
						}
						lastBucket = currBucket;
					}

					long delta = millisPerBucket(_numBuckets)
							- (System.currentTimeMillis() % millisPerBucket(_numBuckets));
					Utils.sleep(delta);

					p("\n");
				}
			}
		});
		cleaner.start();
	}

	@Override
	public void execute(Tuple input) {

		Object obj1 = input.getValue(0);
		Object obj = input.getValue(1);

		int currentBucket = currentBucket(_numBuckets);

		p("execute方法:当前桶:bucket: " + currentBucket);

		synchronized (_objectCounts) {

			long[] curr = _objectCounts.get(obj);

			if (curr == null) {
				curr = new long[_numBuckets];
				_objectCounts.put(obj, curr);
			}

			curr[currentBucket]++;

			System.err
					.print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long数组:"));

			for (long number : curr) {
				System.err.print(number + ":");
			}

			p("execute方法:发射的数据: " + obj + ":" + totalObjects(obj));

			/**
			 * 我们不断的发射的也就是我们某一个商品id,在当前滑动窗口,也就是我们的时间周期内的指标计算值
			 * 要注意,在排序的过程之中,我们只针对key, 也就是我们的商品id,由此发射给后续的排序bolt依据包含了时间区间的信息
			 */

			// 每来一条数据,就会发射一次
			_collector.emit(new Values(obj, totalObjects(obj)));
			_collector.ack(input);
		}

		p("\n");
	}

	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("merchandiseID", "count"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void p(Object o) {
		System.err.println(o.toString());
	}
}

在这里,最需要我们关注的地方是,滑动窗口每滑动一次,将情况一组数据。 而发射数据的过程之中将统计这一组数

据。

时间: 2025-01-03 08:26:21

Storm[TOPN -排序] - RollingCountBolt的相关文章

Storm 实现滑动窗口计数和TopN排序

计算top N words的topology, 用于比如trending topics or trending images on Twitter. 实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码 Topology 这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping  String spoutId = "wordGenerator"; String counterId = &

storm RollingTopWords 实时top-N计算任务窗口设计

转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6381037.html 流式计算中我们经常会遇到需要将数据根据时间窗口进行批量统计的场景,窗口性质一般由两个参数规定:1 Window length: 可以用时间或者数量来定义窗口大小:2 Sliding interval: 窗口滑动的间隔 .通过这两个参数一般把window分成滚动窗口和滑动窗口. Sliding Window(滑动窗口) Tuples are grouped in window

流式数据处理的计算模型 转

分类: 大数据 接触这块将近3个月左右,期间给自己的定位也是业务层开发.对平台级的产品没有太深入的理解和研究,所以也不能大谈特谈什么storm架构之类的了. 说说业务中碰到流式计算问题吧: 1.还是要介绍下简要的架构(原谅我不会画图) 流式数据接入层------------------->流式数据处理层------------------->结果数据归档层 || || || V 中间数据存储层 所有的数据通过接入层源源不断地进入到这个系统, 在数据处理层得到相应的计算存储, 最后将结果写入到归

三:Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

使用dataframe解决spark TopN问题:分组、排序、取TopN

package com.profile.mainimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._ import com.profile.tools.{DateTools, JdbcTools, LogTools, SparkTools}import com.dhd.comment.Constantimport com.profile.comment.Comments /**

3.算子+PV&amp;UV+submit提交参数+资源调度和任务调度源码分析+二次排序+分组topN+SparkShell

1.补充算子 transformations ?  mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值. ?  repartition 增加或减少分区.会产生shuffle.(多个分区分到一个分区不会产生shuffle) 多用于增多分区. 底层调用的是coalesce ?  coalesce(合并) coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle. true为产生shuffle,false不产生shuff

Spark高级排序与TopN问题揭密

[TOC] 引入 前面进行过wordcount的单词统计例子,关键是,如何对统计的单词按照单词个数来进行排序? 如下: scala> val retRDD = sc.textFile("hdfs://ns1/hello").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) scala> val retSortRDD = retRDD.map(pair => (pair._2, pair._1)).

sicily 1046. Plane Spotting(排序求topN)

DescriptionCraig is fond of planes. Making photographs of planes forms a major part of his daily life. Since he tries to stimulate his social life, and since it’s quite a drive from his home to the airport, Craig tries to be very efficient by investi