探索c#之storm的TimeCacheMap

阅读目录:

  1. 概述
  2. 算法介绍
  3. 清理线程
  4. 获取、插入、删除
  5. 总结

概述

最近在看storm,发现其中的TimeCacheMap算法设计颇为高效,就简单分享介绍下。
思考一下如果需要一个带过期淘汰的缓存容器,我们通常会使用定时器或线程去扫描容器,以便判断是否过期从而删除。但这样性能并不友好,在数据量较大时O(n)检查是一笔不小的开销,并且在大量过期数据删除时需要频繁对容器加锁,这会多少会影响到正常的数据读写删除。
Storm设计了一种比较高效的时间缓存容器TimeCacheMap,它的算法可以在某个时间周期内将数据批量删除,一次批量删除只需要加一次锁即可,并且其读写删除复杂度均为O(1)。

算法介绍

TimeCacheMap把要缓存的数据分拆存储到多个小容器内,这里称为桶。另外有个线程专门在一定时间内去扫描这些桶,一旦发现过期后就把整个桶的数据给删除掉。 其中第二步比较关键,它并不是传统意义上的去定时扫描,而是根据过期时间来触发,比如如果一个桶过期时间10s,那么这个线程就10秒触发一次把整个桶删除即可,当然多个桶的触发策略会有所不同,但思路是同一个。   
为了更详细的描述,用代码和例子介绍如下:

    private LinkedList<Dictionary<K, V>> buckets;
    private readonly object Obj = new object();
    private static readonly int NumBuckets = 3;
    private Thread cleaner;

上面使用了k、v的形式作为缓存数据结构,每个Dictionary是一个桶,然后使用链表把多个桶存储起来。Obj是要锁的对象,NumBuckets是桶的数量,cleaner是清理线程。
在缓存初始化的时候,会实例三个空桶加入到buckets,清理线程开始启动循环检查,假设过期时间时30秒,桶的数量为3,当有新数据进来时,会全部加入到第一个桶中。

为了删除性能,清理线程会定期把整个桶给删除掉,一般我们会每次把链表中最后一个桶给清理掉,然后再加入一个新桶到链表头部。
这种情况下就不能按照缓存过期时间去触发线程清理了,因为有三个桶,如果每30秒触发线程清理掉最后一个桶,那么第三个桶要等到第90秒才开始清理,很明显这样是不合理的。 正确的应该是第30秒开始清理,这时就需要调整线程触发时间,比如调整成10秒,继续模拟下:

  1. 触发前1秒插入新数据到第一个桶,如果调整成10秒触发,等到触发删除这个桶时才过了20秒,跟缓存过期时间30秒不一致同样不合理,不管是1秒还是9秒都会导致提前删除数据,需要继续调整触发时间。
  2. 如上缓存提前删除不能允许的,但延迟删除一般是可以接受的,因此可以加入一些冗余时间来保证不会提前删除。 这里调整到15秒触发,触发前1秒插入的缓存桶正好在30秒后触发删除,达到不会提前删除的目的。
  3. 如上在触发前14秒插入数据,那就需要过了30秒+14秒才能删除。

根据上面的模拟,调整到15秒触发是一个比较合理的值,因此推出缓存最长过期时间的公式为:

expirationSecs * (1 + 1 / (numBuckets-1))

如果过期时间是30秒,其最长删除时间是:

30*(1+1/(3-1))=30*(1+0.5)=45  

因此其过期时间范围即为expirationSecs到expirationSecs * (1 + 1 / (numBuckets-1))之间。

清理线程

如上算法的介绍,我们在类型的构造函数中,实例化并启动清理线程:

 public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallBack ex)
    {
        if (numBuckets < 2)
            throw new ArgumentException("numBuckets must be >=2");
        this.buckets = new LinkedList<Dictionary<K, V>>();
        for (int i = 0; i < numBuckets; i++)
            buckets.AddFirst(new Dictionary<K, V>());
        var expirationMillis = expirationSecs * 1000;
        var sleepTime = expirationMillis / (numBuckets - 1);
        cleaner = new Thread(() =>
        {
            while (true)
            {
                Dictionary<K, V> dead = null;
                Thread.Sleep(sleepTime);
                lock (Obj)
                {
                    dead = buckets.Last();
                    buckets.RemoveLast();
                    buckets.AddFirst(new Dictionary<K, V>());
                }
                if (ex != null)
                    ex(dead);
            }
        });
        cleaner.IsBackground = true;
        cleaner.Start();
    }

代码执行步骤:

  1. 初始化桶加入到链表
  2. 计算缓存数据最长过期时间,并作为线程休眠的时间。
  3. 线程触发时删除最后一个桶并加入新的桶
  4. 不断循环休眠触发触发
  5. 启动线程

整个桶的数据删除只需要加一次锁即可,保证其高效。

获取、插入、删除

遍历整个链表,查询到第一个满足key的立即返回,这需要保证不会有重复key。

   public V Get(K key)
        {
            lock (Obj)
            {
                foreach (var item in buckets)
                {
                    if (item.ContainsKey(key))
                        return item[key];
                }
                return default(V);
            }
        }

在插入时删除对应的key,保证不会有重复的key出现。

 public void Put(K key, V value)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                item.Remove(key);
            }
            buckets.First().Add(key, value);
        }
    }

删除对应的key

    public void Remove(K key)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                if (item.ContainsKey(key))
                    item.Remove(key);
            }
        }
    }

总结

那些年我们一起追过的缓存写法(三)中有介绍过关于惰性删除及高效LRU算法优化缓存容器的过期,有兴趣的童鞋可以看看。
完整代码中有容器Size、ContainsKey的实现,github-TimeCacheMap.c#
在storm中,spout发射的消息和acker的消息即保存在各自的TimeCacheMap里,如果消息超时后会自动通知spout的fail方法。 在storm0.8后TimeCacheMap被弃用了,使用的是新的RotatingMap,但设计和实现基本没变,github-TimeCacheMap.javagithub-RotatingMap.java

时间: 2024-11-06 15:14:28

探索c#之storm的TimeCacheMap的相关文章

Storm入门(九)Storm常见模式之流聚合

流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的并且输入流是无限的. 数据流的聚合类型跟具体的应用有关.一些应用把两个流发出的所有的tuple都聚合起来--不管多长时间:而另外一些应用则只会聚合一些特定的tuple.而另外一些应用的聚合逻辑又可

Storm源码分析--Nimbus-data

nimbus-datastorm-core/backtype/storm/nimbus.clj (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf :inimbus inimbus ; INimbus实现类, standalone-nimbus的返回值 :submitted-count (atom 0) ; 已经提交的计算拓扑的数量, 初始值为原子值0

Storm深度分析及其正式版本思考

Storm发展到现在已经有了5个年头,从刚开始惊艳四方,到现在逐渐被新兴框架(Flink.Spark Streaming)挑战.Storm本身也在不断的发展,Twitter对其不断的探索,且深一步的开发了Heron框架.社区也在憋了5年后发布了第一个正式版本. Storm内部机制及探索 内部机制 Storm写了一层调度系统,Nimbus作为调度的Master(类似ResourceManager),Supervisor作为工作机器上的监控进程(类似NodeMonitor),Worker作为真正的工

由提交storm项目jar包引发对jar的原理的探索

序:在开发storm项目时,提交项目jar包当把依赖的第三方jar包都打进去提交storm集群启动时报了发现多个同名的文件错误由此开始了一段对jar包的深刻理解之路. java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/hadoop/app/storm/l

Storm与Spark:谁才是我们的实时处理利器

实时商务智能这一构想早已算不得什么新生事物(早在2006年维基百科中就出现了关于这一概念的页面).然而尽管人们多年来一直在对此类方案进行探讨,我却发现很多企业实际上尚未就此规划出明确发展思路.甚至没能真正意识到其中蕴含的巨大效益. 为什么会这样?一大原因在于目前市场上的实时商务智能与分析工具仍然非常有限.传统数据仓库环境针对的主要是批量处理流程,这类方案要么延迟极高.要么成本惊人——当然,也可能二者兼具. 然而已经有多款强大而且易于使用的开源平台开始兴起,欲彻底扭转目前的不利局面.其中最值得关注

storm 学习教程

转自:http://blog.csdn.net/hrn1216/article/details/51538962 翻译太累了,再也不想去翻译了,真的太累了: 在这个教程中, 你将学到如何创建一个Storm topologies以及怎样把它部署到storm集群上.本教程中,Java将作为主要使用的语言,但在一小部分示例中将会使用Python来阐述storm处理多语言的能力. 预备工作 本教程使用的例子来自于 storm-starter 项目. 我们建议你拷贝该项目并跟随这个例子来进行学习. 请阅读

storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus.bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数: nimbus函数 def nimbus(klass="backtype.storm.daemon.nimbus"):    """Syntax: [s

Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景)

对这个课程有兴趣的可以加我qq2059055336和我联系 Storm是什么? 为什么学习Storm? Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop. 随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等, 大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流. 按照storm作者的说法,Storm对于实

Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)

1.1.课程的背景 Storm是什么? 为什么学习Storm? Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop. 随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等, 大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流. 按照storm作者的说法,Storm对于实时计算的意义类似于Hadoop对于批处理