[原创]Laravel 基于redis队列的解析

目录

  • 参考链接
  • 本文环境
  • 为什么使用队列
  • Laravel 中的队列
    • 分发任务
    • 任务队列 Worker

Last-Modified: 2019年5月10日11:44:18

参考链接

本文环境

  • Laravel 5.5
  • 队列 Redis

为什么使用队列

使用队列的目的一般是:

  1. 异步执行
  2. 出错重试

解释一下:

异步执行: 部分代码执行很耗时, 为了提高响应速度及避免占用过多连接资源, 可以将这部分代码放到队列中异步执行.

Eg. 网站新用户注册后, 需要发送欢迎的邮件, 涉及到网络IO无法控制耗时的这一类就很适合放到队列中来执行.

出错重试: 为了保证一些任务的正常执行, 可以将任务放到队列中执行, 若执行出错则可以延迟一段时间后重试, 直到任务处理成功或出错超过N次后取消执行.

Eg. 用户需要绑定手机号, 此时发送短信的接口是依赖第三方, 一个是不确定耗时, 一个是不确定调用的成功, 为了保证调用成功, 必然需要在出错后重试

Laravel 中的队列

以下分析默认使用的队列及其配置如下

  • 默认队列引擎: redis

通过在 redis-cli 中使用 monitor 命令查看具体执行的命令语句

  • 默认队列名: default

分发任务

此处以分发 异步通知(class XxxNotification implement ShouldQueue)为例.

在Laravel中发起异步通知时, Laravel 会往redis中的任务队列添加一条新任务

redis 执行语句

redis> RPUSH queues:default

{
    "displayName": "App\\Listeners\\RebateEventListener",
    "job": "Illuminate\\Queue\\[email protected]",
    "maxTries": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
        "commandName": "Illuminate\\Events\\CallQueuedListener",
        "command": "O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"timeout\";N;s:6:\"\u0000*\u0000job\";N;}"
    },
    "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
    "attempts": 0
}

上面的redis语句是将任务信息(json格式) rpush 到 redis 队列 queues:default 的尾部.

任务队列 Worker

Laravel 处理任务队列的进程开启方式: php artisan queue:work, 为了更好的观察, 这里使用 --once 选项来指定队列中的单一任务进行处理, 具体的更多参数请自行参考文档

php artisan queue:work --once --delay=1 --tries=3

上述执行语句参数含义:

  1. --once 仅执行一次任务, 默认是常驻进程一直执行
  2. --tries=3 任务出错最多重试3次, 默认是无限制重试
  3. --delay=1 任务出错后, 每次延迟1秒后再次执行, 默认是延迟0秒

当 Worker 启动时, 它依次执行如下步骤:

此处仍以默认队列 default 为例讲解, 且只讲解redis的相关操作

  1. queues:default:delayed 有序集合中获取可以处理的 "延迟任务", 并 rpushqueue:default队列的尾部

    具体的执行语句:

    redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳

    Lua 脚本内容如下:

    -- Get all of the jobs with an expired \"score\"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
    
    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto thedestination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
        redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
    
        for i = 1, #val, 100 do
            redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
        end
    end
    
    return val 
  2. queue:default:reserved有序集合中获取已过期的 "reserved 任务", 并 rpushqueue:default队列的尾部

    具体的执行语句:

    redis> eval "Lua脚本" 2 queues:default:reserved queues:default 当前时间戳

    使用的Lua脚本同步骤 1

  3. queue:default 队列中获取(lpop)一个任务, 增加其 attempts 次数, 并将该任务保存到 queu:default:reserved 有序集合中, 该任务的 score 值为 当前时间 + 90(任务执行超时时间)

    具体的执行语句:

    redis> eval “Lua脚本” 2 queues:default queues:default:reserved 任务超时时间戳

    Lua脚本

    -- Pop the first job off of the queue...
    local job = redis.call('lpop', KEYS[1])
    local reserved = false
    
    if(job ~= false) then
        -- Increment the attempt count and place job on the reserved queue...
        reserved = cjson.decode(job)
        reserved['attempts'] = reserved['attempts'] + 1
        reserved = cjson.encode(reserved)
        redis.call('zadd', KEYS[2], ARGV[1], reserved)
    end
    
    return {job, reserved}

    这里的 90 是根据配置而定: config(‘queue.connections.redis.retry_after‘)

    若预计任务耗时过久, 则应增加该数值, 防止任务还在执行时就被重置

  4. 在成功执行上面获取的任务后, 就将该任务从 queues:default:reserved 队列中移除掉

    具体执行语句: ZREM queues:default:reserved "具体任务"

  5. 如果执行任务失败, 此时分为2种情况:
    1. 任务失败次数未达到指定的重试次数阀值

      将该任务从 queues:default:reserved 中移除, 并将该任务添加到 queue:default:delayed 有序集合中, score 为该任务下一次执行的时间戳

      执行语句:

      redis> EVAL "Lua脚本" 2 queues:default:delayed queues:default:reserved "失败的任务" 任务延迟执行的时间戳

      Lua脚本

      -- Remove the job from the current queue...
      redis.call('zrem', KEYS[2], ARGV[1])
      
      -- Add the job onto the \"delayed\" queue...
      redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
      
      return true
    2. 如果任务失败次数超过指定的重试阀值

      将该任务从 queue:default:reserved 中移除

      执行语句:

      redis> ZREM queue:default:reserved

注意, 上述使用 Lua 脚本的目的在于操作的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时可以确保执行脚本的原子性, 而不会有并发问题.

原文地址:https://www.cnblogs.com/youjiaxing/p/10843674.html

时间: 2024-10-07 23:30:07

[原创]Laravel 基于redis队列的解析的相关文章

laravel中redis队列的使用

一.配置文件 首先我们需要在配置文件中配置默认队列驱动为Redis,队列配置文件是config/queue.php: return [ 'default' => env('QUEUE_DRIVER', 'sync'), 'connections' => [ 'sync' => [ 'driver' => 'sync', ], 'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' =&g

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享

因业务发展需要现在的系统不足以支撑现在的用户量,于是我们在一周之前着手项目的性能优化与分布式部署的相关动作. 概况 现在的系统是基于RabbitHub(一套开源的开发时框架)和Rabbit.WeiXin(开源的微信开发SDK)开发的一款微信应用类系统,主要业务是围绕当下流行的微信元素,如:微官网.微商城.微分销.营销活动.会员卡等. 关于RabbitHub详情请戳: .NET 平台下的插件化开发内核(Rabbit Kernel) RabbitHub开源情况及计划 关于Rabbit.WeiXin详

GuozhongCrawler实现的基于redis的队列

GuozhongCrawler的分布式爬虫还在开发当中.作者首先爆出GuozhongCrawler实现的基于redis的队列,提供大家写其他分布式爬虫的参考. package com.guozhong.queue; import com.guozhong.request.BasicRequest; /** * 线程安全的可阻塞式队列接口 * @author 郭钟 * @QQ群 202568714 * */ public interface BlockingRequestQueue { /** *

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

Java-No.14 基于redis分布式缓存队列实现抢红包功能

package com.shma.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class ObjectUtil {      /**对象转byte[]      * @par

一个基于redis和disque实现的轻量级异步任务执行器

简介 horae是一个基于redis和disque实现的轻量级.高性能的异步任务执行器,它的核心是disque提供的任务队列,而队列有先进先出的时序关系,顾得名:horae. horae : 时序女神,希腊神话中司掌季节时间和人间秩序的三女神,又译"荷莱". horae的关注点不是队列服务的实现本身(已经有不少队列服务的实现了),而是希望借助于redis与disque提供的纯内存的高性能的队列机制,实现一个异步任务执行器.它可以自由配置任务来自哪种队列服务,它不关注任务执行的最终状态(