正如监控应用程序一样,随着时间的推移,获取系统信息变得越来越重要。代码变更(能够影响站点响应速度以及服务的网页数量),新的广告活动,新的用户都能从根本上改变从一个站点上加载的网页数量。随后很多其他性能指标都会被改变。但是如果我们不记录任何指标,就不可能去知道他们改变了或者无法知道我们是做得更好或者更坏了。
在试图开始收集指标用来观看和分析时,我们构建一个带名称的计数器工具(像站点点击计数器,销售计数器,数据库查询计数器可能至关重要)。这些计数器将存储最近的120个各种时间精度(像1秒,5秒,1分钟等到)的样本,样本数量和时间精度能够被定制是必要的。保持计数器的第一步实际上是存储计数器本身。
更新一个计数器
为了更新一个计数器,我们必须存储计数器信息,对每个计数器以及它的精度,像站点点击计数器和5秒精度,我们使用一个HASH来存储5秒内发生的点击数量。用开始时间作为key,点击数量作为value。图5.1挑选了一些数据
当我们开始使用计数器,我们需要去记录哪些计数器已经被写入,这样我们就可以清理旧的数据。为此,我们需要一个有序序列让我们能过一项一项迭代它并且不允许重复,我们可以使用一个LIST结合一个SET,但是这样会花费额外的代码并往返Redis。替代的,我们将使用一个ZSET,它的成员是精度和名字的组合,得分(Score)都为0。通过设置Score都为0,Redis会试图按照score排序并发现他们都相等,然后将会按照name排序,这样我们将会有一个固定的顺序,便于我们更容易的浏览他们,如图5.2所示
图5.1
图5.2
现在已经知道的计数器的结构了,那如何做到这一点呢?每个时间精度和名称(由时间精度和名称组成的key),我们将添加一个引用到一个known ZSET,并且在适当的时间、正确的HASH上面增加计数。以下是代码实现
#代码来源 https://github.com/huangz1990/riacn-code/blob/master/ch05_listing_source.py#L96 PRECISION = [1, 5, 60, 300, 3600, 18000, 86400] #A def update_counter(conn, name, count=1, now=None): # 通过取得当前时间来判断应该对哪个时间片执行自增操作。 now = now or time.time() # 为了保证之后的清理工作可以正确地执行,这里需要创建一个事务型流水线。 pipe = conn.pipeline() # 为我们记录的每种精度都创建一个计数器。 for prec in PRECISION: # 取得当前时间片的开始时间。 pnow = int(now / prec) * prec # 创建负责存储计数信息的散列。 hash = ‘%s:%s‘%(prec, name) # 将计数器的引用信息添加到有序集合里面, # 并将其分值设置为0,以便在之后执行清理操作。 pipe.zadd(‘known:‘, hash, 0) # 对给定名字和精度的计数器进行更新。 pipe.hincrby(‘count:‘ + hash, pnow, count) pipe.execute()
更新计数器信息不是太麻烦,仅仅对每一个时间精度进行一个一个ZADD和HINCRBY操作,获取一个计数器信息同样非常简单,我们使用HGETALL获取整个HASH的数据,转换成我们的时间精度并且将计数转换成一个数字(返回的是字符串),通过时间排序,最后返回这个值。下面是代码
#https://github.com/huangz1990/riacn-code/blob/master/ch05_listing_source.py#L123def get_counter(conn, name, precision): # 取得存储着计数器数据的键的名字。 hash = ‘%s:%s‘%(precision, name) # 从Redis里面取出计数器数据。 data = conn.hgetall(‘count:‘ + hash) # 将计数器数据转换成指定的格式。 to_return = [] for key, value in data.iteritems(): to_return.append((int(key), int(value))) # 对数据进行排序,把旧的数据样本排在前面。 to_return.sort() return to_return
现在已经准确的完成了我们计划完成的事情,我们获取了数据,通过时间排好了序,并转换回了整数。接下来看看如何防止计数器保存太多的数据。
清除旧的计数器
现在我们能过简单的写入和读取计数器数据,但是当我们更新我们的计数器的时候,如果我们不执行清理,总有一天会内存不足。因为我们的提前考虑,已经将已知的计数器记录到了known ZSET.为了清理计数器,我们需要迭代这列表并且清理旧的数据。
为什么不使用EXPIRE?EXPIRE命令的一个限制是只能申请整个keys,我们不能清理一部分keys。由于我们选择的数据结构,计数器X和时间精度Y在任何时间都是一个单一的key,我们需要定期清理。如果你有雄心,你可能想去尝试重新定义数据结构为了使用REDIS标准的到期处理。
当我们清理计数器时,有一些重要事项需要注意。下面列出了一些需要注意的比较重要的事项:
- 可以随时添加新的计数器
- 同一时间可以发生多个清理程序
- 每一分钟都在清理日常计数器是白费力气
- 如果一个计数器已经没有数据了,我们将不再尝试清理
对应我们考虑的事项,我们将构建一个类似于第二章中写过的守护进程。像之前一样,我们将重复循环直到系统说要退出。为了最小化清理过程中的系统负载,我们尝试每分钟清理一次,并且也会清理那些正在创建的,除了那些频率高于一分钟的计数器。如果一个计数器的时间区间是5分钟,我们将尝试没5分钟清理一次。那些更频繁的计数器(我们例子中的1秒和5秒),我们每分钟都会清理。
为了迭代计数器,我们通过将通过ZRANG一个一个的获取已知的计数器。为了清理计数器,我们获取给定计数器的所有开始时间,计算出那些条目是在截止时间以前,并且移除他们。如果给定的计数器没有数据,我们将从known ZSET里移除这个计数器的引用。解释起来很简单,但是代码详情里面展示了几个极端情况。检查这个清单去看一下清理程序的详细细节
#代码https://github.com/huangz1990/riacn-code/blob/master/ch05_listing_source.py#L138def clean_counters(conn): pipe = conn.pipeline(True) # 为了平等地处理更新频率各不相同的多个计数器,程序需要记录清理操作执行的次数。 passes = 0 # 持续地对计数器进行清理,直到退出为止。 while not QUIT: # 记录清理操作开始执行的时间,用于计算清理操作执行的时长。 start = time.time() # 渐进地遍历所有已知的计数器。 index = 0 while index < conn.zcard(‘known:‘): # 取得被检查计数器的数据。 hash = conn.zrange(‘known:‘, index, index) index += 1 if not hash: break hash = hash[0] # 取得计数器的精度。 prec = int(hash.partition(‘:‘)[0]) # 因为清理程序每60秒钟就会循环一次, # 所以这里需要根据计数器的更新频率来判断是否真的有必要对计数器进行清理。 bprec = int(prec // 60) or 1 # 如果这个计数器在这次循环里不需要进行清理, # 那么检查下一个计数器。 # (举个例子,如果清理程序只循环了三次,而计数器的更新频率为每5分钟一次, # 那么程序暂时还不需要对这个计数器进行清理。) if passes % bprec: continue hkey = ‘count:‘ + hash # 根据给定的精度以及需要保留的样本数量, # 计算出我们需要保留什么时间之前的样本。 cutoff = time.time() - SAMPLE_COUNT * prec # 获取样本的开始时间,并将其从字符串转换为整数。 samples = map(int, conn.hkeys(hkey)) # 计算出需要移除的样本数量。 samples.sort() remove = bisect.bisect_right(samples, cutoff) # 按需移除计数样本。 if remove: conn.hdel(hkey, *samples[:remove]) # 这个散列可能已经被清空。 if remove == len(samples): try: # 在尝试修改计数器散列之前,对其进行监视。 pipe.watch(hkey) # 验证计数器散列是否为空,如果是的话, # 那么从记录已知计数器的有序集合里面移除它。 if not pipe.hlen(hkey): pipe.multi() pipe.zrem(‘known:‘, hash) pipe.execute() # 在删除了一个计数器的情况下, # 下次循环可以使用与本次循环相同的索引。 index -= 1 else: # 计数器散列并不为空, # 继续让它留在记录已有计数器的有序集合里面。 pipe.unwatch() # 有其他程序向这个计算器散列添加了新的数据, # 它已经不再是空的了,继续让它留在记录已知计数器的有序集合里面。 except redis.exceptions.WatchError: pass # 为了让清理操作的执行频率与计数器更新的频率保持一致, # 对记录循环次数的变量以及记录执行时长的变量进行更新。 passes += 1 duration = min(int(time.time() - start) + 1, 60) # 如果这次循环未耗尽60秒钟,那么在余下的时间内进行休眠; # 如果60秒钟已经耗尽,那么休眠一秒钟以便稍作休息。 time.sleep(max(60 - duration, 1))
正如之前描述的一样,我们一个一个的迭代了保存计数器的ZSET去查找需要清理的条目。在每一轮中,我们只清理需要清理的数据,所以我们一开始就执行了检查。我们获取了计数器数据并决定哪些需要被清理。在我们清理完旧数据以后,核实没有遗留数据以后从ZSET内移除该计数器。最后,当迭代完所有计数器之后,我们计算出每一轮迭代的时间,并且程序sleep我们执行整个清理还剩下的时间,直到下轮清理
UPDATING A COUNTER
UPDATING A COUNTER