Redis 实现队列http://igeekbar.com/igeekbar/post/436.htm

场景说明:

·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时

·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求

·抢购场景,先入先出的模式

命令:

rpush + blpop 或 lpush + brpop
rpush : 往列表右侧推入数据 blpop : 客户端阻塞直到队列有值输出

简单队列:

simple.php$stmt = $pdo->prepare(‘select id, cid, name from zc_goods limit 200000‘);$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {    $redis->rPush(‘goods:task‘, json_encode($row));} $redis->close();

获取20000万个商品,并把json化后的数据推入goods:task队列

queueBlpop.php
// 出队while (true) {    // 阻塞设置超时时间为3秒    $task = $redis->blPop(array(‘goods:task‘), 3);    if ($task) {        $redis->rPush(‘goods:success:task‘, $task[1]);        $task = json_decode($task[1], true);        echo $task[‘id‘] . ‘:‘ . $task[‘cid‘] . ‘:‘ . ‘handle success‘;        echo PHP_EOL;    } else {        echo ‘nothing‘ . PHP_EOL;        sleep(5);    }}

设置blpop阻塞时间为3秒,当有数据出队时保存到goods:success:task表示执行成功,当队列没有数据时,程序睡眠10秒重新检查goods:task是否有数据出队

cli 模式执行命令:
php simple.phpphp queueBlpop.php

优先级队列

思路:

blpop 有多个键时,blpop会从左至右遍历键,一旦一个键能弹出元素,客户端立即返回。例如:

blpop key1 key2 key3 key4

从key1到key4遍历,如果哪个key有值,则弹出这个值,若多个key同时有值时,优先弹出排在左边的key。

priority.php
// 设置优先级队列$high = ‘goods:high:task‘;
$mid = ‘goods:mid:task‘;$low = ‘goods:low:task‘;
 $stmt = $pdo->prepare(‘select id, cid, name from zc_goods limit 200000‘);
$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) 
{    // cid 小于100放在低级队列    if ($row[‘cid‘] < 100) 
{        $redis->rPush($low, json_encode($row));    
}    // cid 100到600之间放在中级队列    elseif ($row[‘cid‘] > 100 && $row[‘cid‘] < 600) 
{        $redis->rPush($mid, json_encode($row));    }   
 // cid 大于600放在高级队列     else {        $redis->rPush($high, json_encode($row));    }
}$redis->close();
priorityBlop.php
// 优先级队列$high = ‘goods:high:task‘;$mid = ‘goods:mid:task‘;$low = ‘goods:low:task‘;// 出队while(true){    // 优先级高的队列放在左侧    $task = $redis->blPop(array($high, $mid, $low), 3);    if ($task) {        $task = json_decode($task[1], true);        echo $task[‘id‘] . ‘:‘ . $task[‘cid‘] . ‘:‘ . ‘handle success‘;        echo PHP_EOL;    } else {        echo ‘nothing‘ . PHP_EOL;        sleep(5);    }}

优先级高的队列放在blpop命令左侧,依次排序,blpop命令会依次弹出high, mid, low队列的值

cli 模式执行命令:
php priority.phpphp priorityBlpop.php

延迟队列

思路:

可以用一个有序集合来保存延迟任务,member保存任务内容,score保存(当前时间 + 延时时间)。用时间作为score。程序只要用有序集合的第一条任务的score和当前时间做比较,如果当前时间比score小,说明有序集合的所有任务还没到执行时间。

delay.php$stmt = $pdo->prepare(‘select id, cid, name from zc_goods limit 200000‘);$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {    $redis->zAdd(‘goods:delay:task‘, time() + rand(1, 300), json_encode($row));}

将20万条任务导入有序集合goods:delay:task,所有任务延迟到之后的1秒到300秒内执行

delayHandle.php

while (true) {// 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间    // 相对说明集合的其他任务未到执行时间

$rs = $redis->zRange(‘goods:delay:task‘, 0, 0, true);

// 集合没有任务,睡眠时间设置为5秒

    if (empty($rs)) {                        echo ‘no tasks , sleep 5 seconds‘ . PHP_EOL;sleep(5);continue;}     $taskJson = key($rs);                  $delay = $rs[$taskJson];                  $task = json_decode($taskJson, true);          $now = time();// 到时间执行延时任务        if ($delay <= $now) {       

// 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改

        if (!($identifier = acquireLock($task[‘id‘]))) {                continue;}        

// 移动延时任务到任务队列

$redis->zRem(‘goods:delay:task‘, $taskJson);        $redis->rPush(‘goods:task‘, $taskJson);       echo $task[‘id‘] . ‘ run ‘ . PHP_EOL;        

// 释放锁

releaseLock($task[‘id‘], $identifier);    }     else {       

// 延时任务未到执行时间

 $sleep = $delay - $now;       

// 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理

    $sleep = $sleep > 2 ? 2 :$sleep;           echo ‘wait ‘ . $sleep . ‘ seconds ‘ . PHP_EOL;        sleep($sleep);        }}

这个文件对有序集合内的延迟任务做处理,如果延迟任务到了执行时间,则把延迟任务移动到任务队列中

queueBlpop.php// 出队while (true) {   // 阻塞设置超时时间为3秒    $task = $redis->blPop(array(‘goods:task‘), 3);   if ($task) {        $redis->rPush(‘goods:success:task‘, $task[1]);        $task = json_decode($task[1], true);       echo $task[‘id‘] . ‘:‘ . $task[‘cid‘] . ‘:‘ . ‘handle success‘;       echo PHP_EOL;    } else {       echo ‘nothing‘ . PHP_EOL;sleep(5);        }}

处理任务队列中的任务

cli模式下执行命令:
php delay.phpphp delayHanlde.phpphp queueBlpop.php
时间: 2024-09-30 05:37:17

Redis 实现队列http://igeekbar.com/igeekbar/post/436.htm的相关文章

java redis使用之利用jedis实现redis消息队列

应用场景 对于数据库查询的IO连接数高.连接频繁的情况,可以考虑使用缓存实现. 从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现. 序列化 这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象: 主要是用到了ByteArrayOutputStream和ByteArrayInputStream: 需要注意的是每个自定义的需要序列化的对象都要实现Seria

redis缓存队列+MySQL +php任务脚本定时批量入库

原文地址:http://blog.jobbole.com/99567/ 需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试.解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接

分布式日志2 用redis的队列写日志

using System; using System.Collections.Generic; using System.Linq; using System.Web; using System.Web.Mvc; using ServiceStack.Redis; namespace 分布式日志 { public class MyExceptionFilter : HandleErrorAttribute { #region 用c#的队列 //public static Queue<Except

(转)java redis使用之利用jedis实现redis消息队列

应用场景 最近在公司做项目,需要对聊天内容进行存储,考虑到数据库查询的IO连接数高.连接频繁的因素,决定利用缓存做. 从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现. 序列化 这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象: 主要是用到了ByteArrayOutputStream和ByteArrayInputStream: 需要注意的是每个自定

GuozhongCrawler实现的基于redis的队列

GuozhongCrawler的分布式爬虫还在开发当中.作者首先爆出GuozhongCrawler实现的基于redis的队列,提供大家写其他分布式爬虫的参考. package com.guozhong.queue; import com.guozhong.request.BasicRequest; /** * 线程安全的可阻塞式队列接口 * @author 郭钟 * @QQ群 202568714 * */ public interface BlockingRequestQueue { /** *

Redis分布式队列解决文件并发的问题

1.首先将捕获的异常写到Redis的队列中 1 public class MyExceptionAttribute : HandleErrorAttribute 2 { 3 public static IRedisClientsManager clientManager = new PooledRedisClientManager(new string[] { "127.0.0.1:6379", "192.168.1.2:6379" }); 4 public sta

RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. Redis 是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库

redis消息队列简单应用

参考 https://blog.yxccan.cn/blog/detail/3 一.什么是消息队列 是一个消息的链表,是一个异步处理的数据处理引擎. PS:可以理解为在redis的list列表中存放消息数据,然后按照排队方式先进先出(左进右出:右进左出) 二.可以使用的应用场景 主要应用一些延迟或异步操作的场景比如:发送邮件.发送短信.视频转码.图片转码.日志存储.导入数据等在发送邮件或者短信,我们不希望程序一直停留,等待发送成功才相应,而是异步进行处理,即:将待发送的邮件数据添加到消息队列中,

利用redis List队列简单实现秒杀 PHP代码实现

利用redis List队列简单实现秒杀 PHP代码实现 2018年05月28日 11:37:46 m_nanle_xiaobudiu 阅读数 35674更多 分类专栏: Redis 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/m_nanle_xiaobudiu/article/details/80479666 一 生产者producer部分 ---------------------

Redis 消息队列的实现

概述 Redis实现消息队列有两种形式: 广播订阅模式:基于Redis的 Pub/Sub 机制,一旦有客户端往某个key里面 publish一个消息,所有subscribe的客户端都会触发事件 集群订阅模式:基于Redis List双向+ 原子性 + BRPOP Redis消息队列时,当Redis宕机后,消息可能会丢失(也要看持久化的策略).如果收消息方未有重发和验证机制,Redis内的数据会出现丢失.所以,使用Redis的作为消息队列,通常是对于消息的准确性并非特别高的场景. 原理 基于Red