PHP 队列的实现

队列,很简单的一个东西,但往往就是有那么多的麻烦。
  比如PHP发送邮件的时候,如果在用户注册,你是注册的时候发送邮件呢,还是注册成功之后发送呢,很显然,大多数时候都是在注册完成之后发送邮件,除非特殊情况,但是怎么让注册之后直接返回结果而不管是否发送了邮件呢。
  这里就需要这样一个东西,单独处理一个队列,一般情况有两种方式来实现,定时执行网页,还有就是使用PHP的cli模式。

  首先讨论队列的实现。使用数据库,这点很重要。比如这里我建了这样一个表。

CREATE  TABLE IF NOT EXISTS `pitus`.`queue` (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT ‘队列唯一ID‘ ,
  `time` DATETIME NOT NULL COMMENT ‘队列创建时间‘ ,
  `up_time` DATETIME NOT NULL COMMENT ‘队列更新时间‘ ,
  `status` INT NOT NULL DEFAULT 0 COMMENT ‘该队列的状态‘ ,
  `callback` LONGTEXT NOT NULL COMMENT ‘队列的回调函数‘ ,
  `param` LONGTEXT NOT NULL COMMENT ‘回调函数接受的参数‘ ,
  `library` LONGTEXT NOT NULL COMMENT ‘回调函数需要的类库‘ ,
  `message` VARCHAR(1024) NULL COMMENT ‘队列执行信息‘ ,
  PRIMARY KEY (`id`) )
ENGINE = InnoDB

  队列如何来处理呢,简单的可以分为几个方法,还是直接贴代码比较现实,分为一个队列的处理类,和一个接口。每个回调对象必须实现此接口,否则无法调用。

/**
 * 队列处理
 * Class Queue
 * @package ULib
 */
class Queue{
    /**
     * 开始执行队列
     */
    public function run(){
        if(substr(php_sapi_name(), 0, 3) === "cli"){
            //命令行模式循环等待执行
            while(true){
                $this->run_list($this->get_list());
                sleep(5);
            }
        } else{
            $this->run_list($this->get_list());
        }
    }

/**
     * 执行指定的队列列表
     * @param array $list
     */
    public function run_list($list){
        for($i = 0, $l = count($list); $i < $l; $i++){
            $v = $list[$i];
            $status = intval($v[‘status‘]);
            $message = NULL;
            try{
                //执行回调函数,如果没有返回异常则为成功执行
                $this->exec($v[‘callback‘], $v[‘param‘], $v[‘library‘]);
                $status = 1;
            } catch(\Exception $ex){
                --$status;
                $message = $ex->getMessage();
                echo "ERROR:", $message;
            }
            //更新队列信息
            db()->update("queue", [
                ‘status‘ => $status,
                ‘up_time‘ => date("Y-m-d H:i:s"),
                ‘message‘ => $message
            ], [‘id‘ => $v[‘id‘]]);
            $list[$i] = NULL;
        }

}

/**
     * 获取队列
     * @return array
     */
    private function get_list(){
        return db()->select("queue", [
            ‘id‘,
            ‘callback‘,
            ‘param‘,
            ‘library‘,
            ‘status‘
        ], [
            ‘status[<]‘ => 1,
            ‘ORDER‘ => ‘up_time DESC‘
        ]);
    }

/**
     * @param QueueCallback $call  回调类
     * @param mixed         $param 参数
     * @param string        $lib   Lib名称
     */
    public function add($call, $param, $lib){
        $time = date("Y-m-d H:i:s");
        //新将对应的数据序列化后存储到数据库中
        if(db()->insert("queue", [
                ‘time‘ => $time,
                ‘up_time‘ => $time,
                ‘callback‘ => serialize($call),
                ‘param‘ => serialize($param),
                ‘library‘ => serialize($lib)
            ]) < 0
        ){
            //添加错误记录
            $this->record_error("Add queue error on sql." . debug("SQL error:" . implode(",", db()->error()[‘write‘])));
        }
    }

/**
     * 记录错误信息
     * @param $err
     */
    private function record_error($err){

}

/**
     * 执行回调
     * @param string $callback
     * @param string $param
     * @param string $library
     * @throws \Exception
     */
    public function exec($callback, $param, $library){
        $lib = @unserialize($library);
        //首先加载反序列化所需的类库
        if(isset($lib[‘lib‘]) && is_array($lib[‘lib‘])){
            call_user_func_array([
                lib(),
                ‘load‘
            ], $lib[‘lib‘]);
        }
        if(isset($lib[‘c_lib‘]) && is_array($lib[‘c_lib‘])){
            call_user_func_array([
                c_lib(),
                ‘load‘
            ], $lib[‘c_lib‘]);
        }
        /**
         * 对回调函数反序列化
         * @var QueueCallback $call
         */
        $call = @unserialize($callback);
        if(!is_object($call)){
            //初步判断是否为对象
            throw new \Exception("unserialize error");
        }
        $ref = new \ReflectionClass($call);
        if(!in_array("ULib\\QueueCallback", $ref->getInterfaceNames())){
            //检测是否为正确的实现了接口
            throw new \Exception("callback class error.");
        }
        //最后执行,并使用对应的参数
        @$call->run(@unserialize($param));
    }
}

/**
 * 队列的接口
 * Interface QueueCallback
 * @package ULib
 */
interface QueueCallback{
    /**
     * 执行回调函数
     * @param $param
     * @return mixed
     */
    public function run($param);
}

这样使用回调对象的一个好处就是可以使队列处理的内容扩大,而不仅仅限于邮件的处理,还比如一些其他耗时的操作,当然这里也可以更改为多线程处理队列,如果你需要的话。
  最后就是如何实现队列的处理,必须有一个前提就是,同一个队列不能同时有多个线程去处理。这里需要用到一个其他的东西,文件锁,这个实现起来相对容易,而已跨平台性好。如果使用信号那么windows下就不行,代码如下。某些时候一个比较特殊的操作可能也遇得到吧。

<?php
$lock_file = __DIR__ . "/config/queue.lock";
$fp = fopen($lock_file, ‘w‘);//写模式打开,文件不存在直接创建
if(!flock($fp, LOCK_EX | LOCK_NB)){
    //如果当前文件无法锁定,表示被其他进程锁定,所以结束执行
    //LOCK_EX为独享锁,LOCK_NB为非阻塞
    fclose($fp);
    die("Queue must be a single run.\n");
} else{
    echo "LOCK\n";
}
set_time_limit(0);
require_once("sys/config.php");
cfg()->load(‘config/all.php‘); //加载其他配置文件
lib()->load(‘Queue‘, ‘Hook‘);
$hook = new \ULib\Hook();
if(db()->status()){
    $queue = new \ULib\Queue();
    $queue->run();
} else{
    echo("Cannot connect to the database.");
}
flock($fp, LOCK_UN);
fclose($fp);
?>

PHP 队列的实现,布布扣,bubuko.com

时间: 2024-08-05 12:47:41

PHP 队列的实现的相关文章

redis 学习 四 队列

<?php /** * redis实战 * * 利用列表list实现简单队列 * * @example php cache.php */ header('content-type:text/html;chaeset=utf-8'); $redis = new \Redis(); $redis->connect('127.0.0.1', 6379); // 进队列 $userId = mt_rand(000000, 999999); $redis->rpush('QUEUE_NAME',j

构建队列,数组版本

队列作为基本的数据结构,是每个coder所必须掌握的. 队列在逻辑上就像是一条打饭的长队,排在前面的先打到饭(先进先出). 这里用一个数组用以构造一个队列,并设置两个指向,head指向队首,tail指向队尾,初始状态是head与tail指向同一位置(队列为空) 队列有两个操作:入队与出队. 1.入队:对比打饭排队的场景,新来的人排在后面,这是队尾tail需向后移一位. 2.出队:已经打好饭的人就可以出去了,这时队头也需向后移一位,让后面的人成为队头. 注意: 当head与tail都移到数组末端,

链队列代码及应用

链队列代码 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #define OK 1 #define ERROR 0 #define OVERFLOW -2 #define TRUE 1 #define FALSE 0 typedef int Status; typedef int ElemType; typedef struct Qnode{ int

caffe数据读取的双阻塞队列说明

caffe的datareader类中 class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<T*> free_; BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; 这个就是双阻塞队列,先将free队列填充到最大长度,然后按照如下规则: 1,每当生产者push时,先将full队列pop,如果fu

线性结构的常见应用之一 队列

定义:一种可以实现"先进先出"的存储结构 分类 链式队列 --  用链表实现 静态队列 --  用数组实现 静态队列通常都必须是循环队列 循环队列的讲解: 1.静态队列为什么必须是循环队列 2.循环队列需要几个参数来确定   需要两个参数来进行确定:front   rear 3.循环队列各个参数的含义 2个参数在不同的场合有不同的含义 建议初学者先记住,后面再想 1).队列初始化 front 和 rear 的值都是零 2).队列非空 front 代表的是队列的第一个元素 rear 代表

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

MQ队列管理器搭建(一)

多应用单MQ使用场景 如上图所示,MQ独立安装,或者与其中一个应用同处一机.Application1与Application2要进行通信,但因为跨系统,所以引入中间件来实现需求. Application1需要连接MQ,并将消息放入队列Queue中,Application2同样连接MQ,监听在Queue队列上,一旦发现有消息进入则取出该消息进行处理. 下面将给出创建队列管理器和队列的示例: 定义队列管理器名称为Qm1,本地队列名称为Queue,服务器连接通道CHAN_SERVER_CON,监听端口

PHP电商订单自动确认收货redis队列

一.场景 之前做的电商平台,用户在收到货之后,大部分都不会主动的点击确认收货,导致给商家结款的时候,商家各种投诉,于是就根据需求,要做一个订单在发货之后的x天自动确认收货.所谓的订单自动确认收货,就是在在特定的时间,执行一条update语句,改变订单的状态. 二.思路 最笨重的做法,通过linux后台定时任务,查询符合条件的订单,然后update.最理想情况下,如果每分钟都有需要update的订单,这种方式也还行.奈何平台太小,以及卖家发货时间大部分也是密集的,不会分散在24小时的每分钟.那么,

快速入门系列--WCF--06并发限流、可靠会话和队列服务

这部分将介绍一些相对深入的知识点,包括通过并发限流来保证服务的可用性,通过可靠会话机制保证会话信息的可靠性,通过队列服务来解耦客户端和服务端,提高系统的可服务数量并可以起到削峰的作用,最后还会对之前的事务知识做一定补充. 对于WCF服务来说,其寄宿在一个资源有限的环境中,为了实现服务性能最大化,需要提高其吞吐量即服务的并发性.然而在不进行流量控制的情况下,并发量过多,会使整个服务由于资源耗尽而崩溃.因此为相对平衡的并发数和系统可用性,需要设计一个闸门(Throttling)控制并发的数量. 由于

Python3-queue模块-同步队列

Python3中的queue模块实现多生产者,多消费者队列,特别适用于多个线程间的信息的安全交换,主要有三个类 queue.Queue(maxsize=0) 构造一个FIFO(先进先出)的队列 queue.LifoQueue(maxsize=0) 构造一个LIFO(后进先出)的队列 queue.PriorityQueue(maxsize=0) 构造一个具有优先级的队列,存储的是一个元组(n, value),n为数字代表优先级,数字越小,级别越高 这个模块定义了两个异常 queue.Empty 如