利用redis实现带优先级的消息队列

前言

以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结。

1. 利用Sorted Set 实现

使用Sorted Set 做优先级队列最大的优点是直观明了。

ZADD key score member [[score member] [score member] ...]

score 作为优先级,member 作为相应的任务

在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高

由于score 就是menber的优先级,因此非常直观

可以使用

MULTI
ZRANGE key 0 0 WITHSCORES
ZREMRANGEBYRANK task_list 0 0
EXEC

来获取任务队列中优先级最高的元素

ZRANGE 用于获取任务,ZREMRANGEBYRANK 用于从消息队列中移除

注意:由于Sorted Set本身是一个set,因此消息队列中的消息不能重复,否则新加入的消息会覆盖以前加入的消息

注意:对于score 相同的消息,Sorted Set 会按照字典序进行排序

2. 利用List实现

应该一下就能想到,list 是作为消息队列的最理想的选择,但这里使用list 实现带优先级的消息队列也可以有好几种不同的实现方式。

2.1 准备

首先,如果我们假定消息队列中的消息,从消息队列的右侧推入(RPUSH),从左侧取出(LPOP)

那么单个list 很容易构造成一个FIFO 队列。但是如果优先级只有两级,高和低,那么我们可以把高优先级的消息,使用LPUSH 推入队列左侧,把低优先级的消息,使用RPUSH推入到队列右侧, 这样单个list就可以实现2级的带优先级的消息队列。

2.2 使用BLPOP

redis 提供了列表的阻塞式(blocking)弹出原语。

BLPOP key [key ...] timeout

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

这样我们可以创建三个队列,high,normal, low ,分别代表高优先级,普通优先级,低优先级

BLPOP high normal low

2.3 基于多个key 的LPOP

有时候我们并不想要阻塞式的原语,那么在业务层,我们可以在多个队列中遍历,查找来获取消息

queue_list = ["high", "normal", "low"]
def get_msg():
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

翻阅rq 的源码时,我发现rq的带优先级的任务队列正是这样实现的

2.4 扩展

如果我们需要10个优先级的消息队列,可以想到我们需要至少5个队列(参考2.1)

这时候我们的消息队列的命名可能就需要采取某种规则

比如,原打算命名的消息队列的名称为 msg_queue

那么这5个消息队列就可以被命名为

msg_queue-0

msg_queue-1

msg_queue-2

msg_queue-3

msg_queue-4

如果再结合

KEYS pattern

我们就可以得到对任意多个优先级支持的消息队列

# priority 1 ~ 10
# push message into list
def push_message(queue, priority, message):
    num = (priority - 1) / 2
    target_queue = queue + "-" + str(num)
    # direct
    if priority % 2 == 1:
        redis_db.lpush(target_queue, message)
    else:
        redis_db.rpush(target_queue, message)
# fetch  a message
def fetch_message(queue):
    queue_list = redis_db.keys(queue + "-?")
    queue_list = sorted(queue_list)
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

注意:采用这种做法,同一优先级的消息,并不满足FIFO

时间: 2025-01-02 15:21:54

利用redis实现带优先级的消息队列的相关文章

用redis实现支持优先级的消息队列

用redis实现支持优先级的消息队列 为什么需要消息队列 系统中引入消息队列机制是对系统一个非常大的改善.例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中.你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验. 有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作.例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟.这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步

redis中list模拟案例-消息队列

redis 数据类型:字符串string.list.set.zset.hash 主要的是list消息队列 消息队列的概念:先进先出 <?php//echo phpinfo();ini_set('display_errors','On');error_reporting(E_ALL);//连接本地的 Redis 服务$redis = new Redis();$redis->connect('127.0.0.1', 6379);print_r($redis);echo "<br/&

【redis】spring boot利用redis的Keyspace Notifications实现消息通知

前言 需求:当redis中的某个key失效的时候,把失效时的value写入数据库. github: https://github.com/vergilyn/RedisSamples 1.修改redis.conf 安装的redis服务默认是: notify-keyspace-events "",修改成 notify-keyspace-events Ex; 位置:redis安装目下的redis.windows-service.conf 或 redis.windows.conf.(具体看re

用STL设计消息队列、优先级消息队列、资源分配管理器

STL库老早已经成为C++的一部分,在使用C++开发项目的过程中,很多人还在犹豫要不要使用STL库,觉得STL库很难,其实不然.我工作的项目中现在大量使用STL库,STL使用调试简单,高效,可以减少很多重复的代码. 本文的主要目的是使用STL的queue 和 priority queue来阐述下项目中经常使用的消息队列以及资源分配模式.本文的例子主要如下: 消息队列 带优先级的消息队列 资源分配管理器 STL容器 我们将使用下面的容器来实现本文的例子: queue 队列容器支持添加一个元素,并且

消息队列介绍、RabbitMQ&amp;Redis的重点介绍与简单应用

消息队列介绍.RabbitMQ.Redis 一.什么是消息队列 这个概念我们百度Google能查到一大堆文章,所以我就通俗的讲下消息队列的基本思路. 还记得原来写过Queue的文章,不管是线程queue还是进程queue他都是一种消息队列.他都是基于生产者消费者模型来处理消息. Python中的进程queue,是用于父进程与子进程,或者同属于一个父进程下的多个子进程之间进行信息交互.注意这种queue只能在同一个python程序下才能用,如果两个python程序,或者Python和别的什么程序,

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

基于Redis实现分布式消息队列(3)

1.Redis是什么鬼? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. 2.性能 性能方面:Redis是足够高效的. 和Memecached对比,在数据量较小大情况下,Redis性能更优秀. 数据量大到一定程度的时候,Memecached性能稍好. 简单结论:但总体上讲Redis性能已经足够好. // Ref: Redis性能测试

jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换

redis支持发布/订阅的消息队列机制,jedis提供了java访问redis的客户端,本文将描述如何用jedis实现简单的消息队列,并传输对象. redis支持发布.订阅的功能,基本的命令有publish.subscribe等.在jedis中,有对应的java方法,并且只能发布字符串消息.为了传输对象,需要将对象进行序列化,并封装成字符串进行处理.将对象序列化后,只能成为字节流,如何封装成字符串是一个难点,具体可参考下面的代码. 实现三个类,一个对应publish.一个对应subscribe.

消息队列

1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜快,厨师做菜慢. 再举个例子,到