thinkphp5 消息队列thinkphp-queue扩展

1.简介

thinkphp-queue是thinkphp的一个第三方扩展, 内置了 Redis,Database,Topthink ,Sync这四种驱动,推荐使用redis

2. 下载 和安装

composer require topthink/think-queue

配置目录在: application/extra/queue.php

return [
    ‘connector‘  => ‘Redis‘,            // Redis 驱动
    ‘expire‘     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    ‘default‘    => ‘default‘,        // 默认的队列名称
    ‘host‘       => ‘127.0.0.1‘,        // redis 主机ip
    ‘port‘       => 6379,            // redis 端口
    ‘password‘   => ‘‘,                // redis 密码
    ‘select‘     => 0,                // 使用哪一个 db,默认为 db0
    ‘timeout‘    => 0,                // redis连接的超时时间
    ‘persistent‘ => false,            // 是否是长连接

    //    ‘connector‘ => ‘Database‘,   // 数据库驱动
    //    ‘expire‘    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    //    ‘default‘   => ‘default‘,    // 默认的队列名称
    //    ‘table‘     => ‘jobs‘,       // 存储消息的表名,不带前缀
    //    ‘dsn‘       => [],

    //    ‘connector‘   => ‘Topthink‘,    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
    //    ‘token‘       => ‘‘,
    //    ‘project_id‘  => ‘‘,
    //    ‘protocol‘    => ‘https‘,
    //    ‘host‘        => ‘qns.topthink.com‘,
    //    ‘port‘        => 443,
    //    ‘api_version‘ => 1,
    //    ‘max_retries‘ => 3,
    //    ‘default‘     => ‘default‘,

//        ‘connector‘   => ‘Sync‘,        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];

3.入队,创建队列的代码

/**
* 文件路径: \application\index\controller\JobTest.php
* 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列
*/
namespace application\index\controller;
  use think\Exception;

  use think\Queue;

  class JobTest {
  /**
   * 一个使用了队列的 action
   */
  public function actionWithHelloJob(){

      // 1.当前任务将由哪个类来负责处理。
      //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
      $jobHandlerClassName  = ‘application\index\job\Hello‘;
      // 2.当前任务归属的队列名称,如果为新队列,会自动创建
      $jobQueueName        = "helloJobQueue";
      // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
      //   ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
      $jobData             = [ ‘ts‘ => time(), ‘bizId‘ => uniqid() , ‘a‘ => 1 ] ;
      // 4.将该任务推送到消息队列,等待对应的消费者去执行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
      // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
      if( $isPushed !== false ){
          echo date(‘Y-m-d H:i:s‘) . " a new Hello Job is Pushed to the MQ"."<br>";
      }else{
          echo ‘Oops, something went wrong.‘;
      }
  }
 }

如果是多个任务:如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\[email protected]app\lib\job\[email protected]

4.消费队列的代码:

<?php

namespace app\test\job;

use think\queue\Job;

class Hello {

    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job,$data){
        // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
            print("<info>Hello Job has been done and deleted"."</info>\n");
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                $job->delete();
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function doHelloJob($data) {
        // 根据消息中的数据进行实际的业务处理...
        var_dump($data);
//        print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
//        print("<info>Hello Job is Fired at " . date(‘Y-m-d H:i:s‘) ."</info> \n");
//        print("<info>Hello Job is Done!"."</info> \n");

        return true;
    }
}

执行代码:

php think queue:work --queue helloJobQueue

5.总结:

5.1 命名:

  • queue:subscribe 命令 [截至2017-02-15,作者暂未实现该模式,略过]
  • queue:work 命令

    work 命令: 该命令将启动一个 work 进程来处理消息队列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue helloJobQueue

    queue:restart 重新开启

    参考资料:https://blog.csdn.net/will5451/article/details/80434174

    https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

    https://github.com/top-think/think-queue

原文地址:https://www.cnblogs.com/myvic/p/9373710.html

时间: 2024-10-13 09:16:37

thinkphp5 消息队列thinkphp-queue扩展的相关文章

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

消息队列 神马是消息队列,看看某度的原话“在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量”. 其实消息队列还可以用于解耦,在多层项目模型或中型项目以上,都会用到消息队列,减少层与层之间的耦合:还可以做跨进程间的通讯(传输率显然比不上RPC). 上一节说道最终需要采用消息队列来进行分离前级和后级,并且采用异步方式,用于提高业务服务器的吞吐率,不过,虽然分离了,如果后级服务器的处理能力达不到请求数或接近平

php消息队列之 think queue消息队列初体验

使用thinkphp 5的  消息队列 think queue ● php think queue:listen --queue queuename ● php think queue:work --daemon --queue xwyqueue 使用这两个命令进行消息队列的监控,在整个Linux操作界面关闭以后,发现就无法运行了. 原因就是这个进程没有常驻在系统后台.那么就需要用到liunx操作系统的 supervisor 来保证进程常驻 在百度搜索 supervisor 的安装 使用 然后配

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

原文:[c#]队列(Queue)和MSMQ(消息队列)的基础使用 首先我们知道队列是先进先出的机制,所以在处理并发是个不错的选择.然后就写两个队列的简单应用. Queue 命名空间 命名空间:System.Collections,不在这里做过多的理论解释,这个东西非常的好理解. 可以看下官方文档:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2 示例代码 我这里

消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka). 四.JMS消息服务 讲消息队列就不得不提JMS .JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 在EJB架构中,有消息bean可以无缝的与JM消息服务集成.在J2EE架构模

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

大型网站架构系列:消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

大型网站架构系列:消息队列(二) (转)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

大型网站架构之分布式消息队列

以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务 常用消息队列 参考(推荐)资料 本次分享总结 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,K

初识中间件之消息队列

初识中间件之消息队列 测试那点事儿 测试那点事儿 初识中间件之消息队列 1 消息队列概述 消息队列是分布式系统中的重要组件,主要解决应用耦合,异步消息,流量削锋等问题,以实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统中不可缺少的中间件. 目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,比如我之前用过的RabbitMQ以及kafka. 2 消息队列应用场景 在实际应用中,消息队列常用于异步处理.应用解耦.流量削锋