redis实现队列queue

参考:《Redis入门指南》第4章进阶

http://book.51cto.com/art/201305/395461.htm

4.4.2 使用Redis实现任务队列

说到队列很自然就能想到Redis的列表类型,3.4.2节介绍了使用LPUSH和RPOP命令实现队列的概念。如果要实现任务队列,只需要让生产者将任务使用LPUSH命令加入到某个键中,另一边让消费者不断地使用RPOP命令从该键中取出任务即可。

在小白的例子中,完成发邮件的任务需要知道收件地址、邮件主题和邮件正文。所以生产者需要将这三个信息组成对象并序列化成字符串,然后将其加入到任务队列中。而消费者则循环从队列中拉取任务,就像如下伪代码:

  1. # 无限循环读取任务队列中的内容
  2. loop
  3. $task = RPOR queue
  4. if $task
  5. # 如果任务队列中有任务则执行它
  6. execute($task)
  7. else
  8. # 如果没有则等待1秒以免过于频繁地请求数据
  9. wait 1 second

到此一个使用Redis实现的简单的任务队列就写好了。不过还有一点不完美的地方:当任务队列中没有任务时消费者每秒都会调用一次RPOP命令查看是否有新任务。如果可以实现一旦有新任务加入任务队列就通知消费者就好了。其实借助BRPOP命令就可以实现这样的需求。

BRPOP命令和RPOP命令相似,唯一的区别是当列表中没有元素时BRPOP命令会一直阻塞住连接,直到有新元素加入。如上段代码可改写为:

  1. loop
  2. # 如果任务队列中没有新任务,BRPOP命令会一直阻塞,不会执行execute()。
  3. $task = BRPOP queue, 0
  4. # 返回值是一个数组(见下介绍),数组第二个元素是我们需要的任务。
  5. execute($task[1])

BRPOP命令接收两个参数,第一个是键名,第二个是超时时间,单位是秒。当超过了此时间仍然没有获得新元素的话就会返回nil。上例中超时时间为"0",表示不限制等待的时间,即如果没有新元素加入列表就会永远阻塞下去。

当获得一个元素后BRPOP命令返回两个值,分别是键名和元素值。为了测试BRPOP命令,我们可以打开两个redis-cli实例,在实例A中:

  1. redis A> BRPOP queue 0

brpop:

返回值:

假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长。

反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key ,第二个元素是被弹出元素的值。

键入回车后实例1会处于阻塞状态,这时在实例B中向queue中加入一个元素:

  1. redis B> LPUSH queue task
  2. (integer) 1

在LPUSH命令执行后实例A马上就返回了结果:

  1. 1) "queue"
  2. 2) "task"

同时会发现queue中的元素已经被取走:

  1. redis> LLEN queue
  2. (integer) 0

除了BRPOP命令外,Redis还提供了BLPOP,和BRPOP的区别在与从队列取元素时BLPOP会从队列左边取。具体可以参照LPOP理解,这里不再赘述。

4.4.3 优先级队列

前面说到了小白博客需要在发布文章的时候向每个订阅者发送邮件,这一步骤同样可以使用任务队列实现。由于要执行的任务和发送确认邮件一样,所以二者可以共用一个消费者。然而设想这样的情况:假设订阅小白博客的用户有1000人,那么当发布一篇新文章后博客就会向任务队列中添加1000个发送通知邮件的任务。如果每发一封邮件需要10秒,全部完成这1000个任务就需要近3个小时。问题来了,假如这期间有新的用户想要订阅小白博客,当他提交完自己的邮箱并看到网页提示他查收确认邮件时,他并不知道向自己发送确认邮件的任务被加入到了已经有1000个任务的队列中。要收到确认邮件,他不得不等待近3个小时。多么糟糕的用户体验!而另一方面发布新文章后通知订阅用户的任务并不是很紧急,大多数用户并不要求有新文章后马上就能收到通知邮件,甚至延迟一天的时间在很多情况下也是可以接受的。

所以可以得出结论当发送确认邮件和发送通知邮件两种任务同时存在时,应该优先执行前者。为了实现这一目的,我们需要实现一个优先级队列。

BRPOP命令可以同时接收多个键,其完整的命令格式为BRPOP key [key …] timeout,

按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素

如BRPOP queue:1 queue:2 0。意义是同时检测多个键,如果所有键都没有元素则阻塞,如果其中有一个键有元素则会从该键中弹出元素。例如,打开两个redis-cli实例,在实例A中:

  1. redis A> BRPOP queue:1 queue:2 queue:3 0

在实例B中:

  1. redis B> LPUSH queue:2 task
  2. (integer) 1

则实例A中会返回:

  1. 1) "queue:2"
  2. 2) "task"

如果多个键都有元素则按照从左到右的顺序取第一个键中的一个元素。我们先在queue:2和queue:3中各加入一个元素:

  1. redis> LPUSH queue:2 task1
  2. 1) (integer) 1
  3. redis> LPUSH queue:3 task2
  4. 2) (integer) 1

然后执行BRPOP命令:

  1. redis> BRPOP queue:1 queue:2 queue:3 0
  2. 1) "queue:2"
  3. 2) "task1"

借此特性可以实现区分优先级的任务队列。我们分别使用queue:confirmation. email和queue:notification.email两个键存储发送确认邮件和发送通知邮件两种任务,然后将消费者的代码改为:

  1. loop
  2. $task =
  3. BRPOP queue:confirmation.email,
  4. queue:notification.email,
  5. 0
  6. execute($task[1])

这时一旦发送确认邮件的任务被加入到queue:confirmation.email队列中,无论queue: notification.email还有多少任务,消费者都会优先完成发送确认邮件的任务。

参考:http://book.51cto.com/art/201305/395463.htm

官网说法:

RPOPLPUSH source destination

命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:

  • 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
  • 将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。

模式: 安全的队列

Redis的列表经常被用作队列(queue),用于在不同程序之间有序地交换消息(message)。一个客户端通过 LPUSH 命令将消息放入队列中,而另一个客户端通过 RPOP 或者 BRPOP 命令取出队列中等待时间最长的消息。

不幸的是,上面的队列方法是『不安全』的,因为在这个过程中,一个客户端可能在取出一个消息之后崩溃,而未处理完的消息也就因此丢失。

使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解决这个问题:因为它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用 LREM 命令将这个消息从备份表删除。

最后,还可以添加一个客户端专门用于监视备份表,它自动地将超过一定处理时限的消息重新放入队列中去(负责处理该消息的客户端可能已经崩溃),这样就不会丢失任何消息了。

模式:循环列表  Circular list

通过使用相同的 key 作为 RPOPLPUSH 命令的两个参数,客户端可以用一个接一个地获取列表元素的方式,取得列表的所有元素,而不必像 LRANGE 命令那样一下子将所有列表元素都从服务器传送到客户端中(两种方式的总复杂度都是 O(N))。

以上的模式甚至在以下的两个情况下也能正常工作:

  • 有多个客户端同时对同一个列表进行旋转(rotating),它们获取不同的元素,直到所有元素都被读取完,之后又从头开始。
  • 有客户端在向列表尾部(右边)添加新元素。

这个模式使得我们可以很容易实现这样一类系统:有 N 个客户端,需要连续不断地对一些元素进行处理,而且处理的过程必须尽可能地快。一个典型的例子就是服务器的监控程序:它们需要在尽可能短的时间内,并行地检查一组网站,确保它们的可访问性。

注意,使用这个模式的客户端是易于扩展(scala)且安全(reliable)的,因为就算接收到元素的客户端失败,元素还是保存在列表里面,不会丢失,等到下个迭代来临的时候,别的客户端又可以继续处理这些元素了。

更多:

http://redis.io/commands/blpop

\http://redis.io/commands/rpoplpush

时间: 2024-11-07 20:43:52

redis实现队列queue的相关文章

分布式日志2 用redis的队列写日志

using System; using System.Collections.Generic; using System.Linq; using System.Web; using System.Web.Mvc; using ServiceStack.Redis; namespace 分布式日志 { public class MyExceptionFilter : HandleErrorAttribute { #region 用c#的队列 //public static Queue<Except

GuozhongCrawler实现的基于redis的队列

GuozhongCrawler的分布式爬虫还在开发当中.作者首先爆出GuozhongCrawler实现的基于redis的队列,提供大家写其他分布式爬虫的参考. package com.guozhong.queue; import com.guozhong.request.BasicRequest; /** * 线程安全的可阻塞式队列接口 * @author 郭钟 * @QQ群 202568714 * */ public interface BlockingRequestQueue { /** *

Redis的队列和消息队列的区别【concept】

Redis 队列 用redis作为队列效率高,而且简单易用 使用场景 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网触发执行发送,程序会出现耗时 高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台去处理这些请求 抢购场景,先入先出的模式 Redis 消息队列 主要应用在网络中实现异步任务,Reids可以充当消息队列实现两种模式:生产者 ->消费者,发布者->订阅者 第一种方式是一对一,后者是一对多 生产者/消费者模型 生产者模型需要存在生产者和消费者两方,而在redis中

java redis使用之利用jedis实现redis消息队列

应用场景 对于数据库查询的IO连接数高.连接频繁的情况,可以考虑使用缓存实现. 从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现. 序列化 这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象: 主要是用到了ByteArrayOutputStream和ByteArrayInputStream: 需要注意的是每个自定义的需要序列化的对象都要实现Seria

【C++】容器适配器实现队列Queue的各种功能(入队、出队、判空、大小、访问所有元素等)

适配器: 将一个通用的容器转换为另外的容器,所谓的容器,指的是存放数据的器具,像我们知道的顺序表和链表都是容器Container.举个例子解释一下吧,我们的电压都是220v,而像充电线就起到转换到合适的电压的作用.而这里,我们的主角就是将通用的链表结构转换为来实现队列Queue这一数据结构,(意思就是,链表还可以去实现其他的数据结构). 在线性表中,分为链表和顺序表,我们知道其中的差别: 链表:节点灵活,使得插入删除元素方便灵活,但是对于单链表若有节点指针_head._tail,查找元素较为麻烦

redis缓存队列+MySQL +php任务脚本定时批量入库

原文地址:http://blog.jobbole.com/99567/ 需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试.解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接

(转)java redis使用之利用jedis实现redis消息队列

应用场景 最近在公司做项目,需要对聊天内容进行存储,考虑到数据库查询的IO连接数高.连接频繁的因素,决定利用缓存做. 从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现. 序列化 这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象: 主要是用到了ByteArrayOutputStream和ByteArrayInputStream: 需要注意的是每个自定

使用C#的泛型队列Queue实现生产消费模式

本篇体验使用C#的泛型队列Queue<T>实现生产消费模式. 如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列. 现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>. 该类肯定会维护一个有关生产.物料的Queue<T>类型的字段,还存在一个有关消费.Action<T>类型的字段. 在ProducerConsumer类的构造函数中,为Action<T&

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue) 队列只在多线程里有意义,是一种线程安全的数据结构. get与put方法 ''' 创建一个"队列"对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中: q.put() 调用队列对象的put()方法在队尾插入一个项目.put()