用redis实现消息队列

为什么需要消息队列

系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验。

有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈用户与否。

另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的处理掉高并发的任务。

消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来作。

使用redis怎么做消息队列

首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。

redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,所以可以用来做消息队列。

Redis实现先进先出队列

Redis实现FIFO很容易,只需要一个List对象从头取数据,从尾部塞数据即可实现。例如lpush存数据,brpop取数据。

Redis实现优先级队列

首先brpop和blpop是支持多list读取的,比如brpop lista listb 0 命令就可以实现先从lista读取数据,读取完lista的数据再去读取listb的数据。

那么我们就可以通过如下方式实现了:

127.0.0.1:6379> lpush a 1
(integer) 1
127.0.0.1:6379> lpush a 2
(integer) 2
127.0.0.1:6379> lpush a 3
(integer) 3
127.0.0.1:6379> lpush b 1
(integer) 1
127.0.0.1:6379> lpush b 2
(integer) 2
127.0.0.1:6379> lpush b 3
(integer) 3
127.0.0.1:6379> brpop a b 0
1) "a"
2) "1"
127.0.0.1:6379> brpop a b 0
1) "a"
2) "2"
127.0.0.1:6379> brpop a b 0
1) "a"
2) "3"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "1"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "2"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "3"
127.0.0.1:6379> brpop a b 0

这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。

多优先级问题解决

如果优先级级别很多的情况,假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别。那么我们第三种方案将不太合适了。

虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。

有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。

例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。

因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。

把上面的方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置 10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据 就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。

redis实现定时消息队列

由于Redis排序集合(Sorted Sets)没有实现阻塞功能,所以只能通过程序自己实现。score字段存入时间戳,由于时间戳较长我们用三位数字代替。

127.0.0.1:6379> zadd seta 100 a
(integer) 1
127.0.0.1:6379> zadd seta 200 b
(integer) 1
127.0.0.1:6379> zadd seta 300 c
(integer) 1
127.0.0.1:6379> zadd seta 300 d
(integer) 1

首先我们插入4条数据。

然后我们获取0到当前时间的数据。比如当前时间戳为200,那么我们执行如下命令

127.0.0.1:6379> zrangebyscore seta 0 200 limit 0 1
1) "a"
127.0.0.1:6379> zrem seta a
(integer) 1
127.0.0.1:6379> zrangebyscore seta 0 201 limit 0 1
1) "b"
127.0.0.1:6379> zrem seta b
(integer) 1
127.0.0.1:6379> zrangebyscore seta 0 202 limit 0 1
(empty list or set)

如果取到空数据,阻塞一段时间,然后继续取数据,循环执行即可。

这里我们为什么没有采用zremrangebyscore命令而是采用zrangebyscore和zrem组合,因为zremrangebyscore没有limit参数,可能取到多行数据(例如两个数据socore一样等),由于并发问题可能导致zrem返回0,这样也没事,我们继续取即可。

java代码片段:

public String getData() throws Exception {
    Jedis jedis = getResource();
    while (true) {
        Set<String> seta = jedis.zrangeByScore("seta", 0, System.currentTimeMillis(), 0, 1);
        if (seta != null && seta.size() > 0) {
            String data = seta.toArray(new String[] {})[0];
            Long res = jedis.zrem("seta", data);
            if (res > 0) {
                return data;
            }
        }
        Thread.sleep(1000L);
    }
}

来自个人博客:http://www.jflyfox.com/mtg/front/article/997.html

时间: 2024-12-22 21:59:01

用redis实现消息队列的相关文章

NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

php+redis实现消息队列

原文地址:http://www.cnblogs.com/lisqiong/p/6039460.htmlphp+redis实现消息队列 ? 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 ?redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 1 2 3 4 5 6 7 8 9 10 11 12 13

Redis 做消息队列

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现.定义: 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息:即对于每个消息只能被最多一个消费者拥有. 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息:即正常情况下每个消费者收到的消息应该都是一样的. 那么如此多的MQ产品,为什么要使用redis作消息队列呢?以下附上一份总结了别人的一些r

为什么学习Redis作为消息队列服务器

使用Redis作为消息队列服务场景 " 消息 "是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中," 消息队列 "是在消息的传输过程中保存消息的 容器 . 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件

PHP + Redis 实现消息队列

Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理.生产者只管向队列中插入数据,不管消费者是否成功消费. 2.消费者挂掉消息不会丢失,但是需要重新触发一下消费者,才能够继续消费消息. 代码如下: lib.php 是工具文件,里面有数据库的连接.Redis的连接: <?php /** * 获取数据库连接 * *

Java利用Redis实现消息队列

应用场景 为什么要用redis?二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: 1 package Utils; 2 import java.io.*; 3 /** 4 * Created

使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,"消息队列"是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时

jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换

redis支持发布/订阅的消息队列机制,jedis提供了java访问redis的客户端,本文将描述如何用jedis实现简单的消息队列,并传输对象. redis支持发布.订阅的功能,基本的命令有publish.subscribe等.在jedis中,有对应的java方法,并且只能发布字符串消息.为了传输对象,需要将对象进行序列化,并封装成字符串进行处理.将对象序列化后,只能成为字节流,如何封装成字符串是一个难点,具体可参考下面的代码. 实现三个类,一个对应publish.一个对应subscribe.

【转】NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明