队列扩展, 支持多个队列

<?php

/**
 * @author 魔芋红茶
 *
 */
class MsgQuery {
    // TODO - Insert your code here
    private static $KEY_CACHE_PREFIX = ‘mass.query.cache‘; // 消息缓冲key前缀
    private static $KEY_QUERY_PREFIX = ‘mass.query.lv‘; // 消息key
    private static $KEY_CACHE_DEAL_PREFIX = ‘mass.query.deal‘; // 已处理缓冲key前缀
    const SCORE_NUM = 5; // 优先级划分数目
    const MIN_SCORE = 1; // 最小优先级
    private static $MAX_SCORE;
    private static $instance = null;
    private $redis;
    private $curQuery;
    const QUERY_DEFAULT           = ‘ms.q.dft‘;
    const QUERY_ITEM_TASK            = ‘ms.q.itm‘;
    const QUERY_ITEM_SHELVES_ADD  = ‘ms.q.shelves_add‘;  // 上架
    const QUERY_ITEM_SHELVES_MOVE = ‘ms.q.shelves_move‘; // 下架
    const QUERY_ITEM_IMPORT          = ‘ms.q.import‘;          // 导入
    const QUERY_ITEM_PRICE           = ‘ms.q.price‘;          // 修改售价
    const QUERY_ITEM_INV_QTY      = ‘ms.q.inv_qty‘;      // 修改库存
    const QUERY_ITEM_POINT            = ‘ms.q.sell_point‘;      // 修改卖点
    const QUERY_ITEM_DEL           = ‘ms.q.itm.delete‘;      // 商品删除
    const QUERY_ITEM_RESTORE      = ‘ms.q.itm.restore‘;  // 商品恢复
    const QUERY_ITEM_EDIT           = ‘ms.q.itm.edit‘;      // 商品修改
    const QUERY_ITEM_ADD           = ‘ms.q.itm.add‘;      // 商品添加

    private static $querys = array (
        self::QUERY_DEFAULT,
        self::QUERY_ITEM_TASK,
        self::QUERY_ITEM_SHELVES_ADD,
        self::QUERY_ITEM_SHELVES_MOVE,
        self::QUERY_ITEM_IMPORT,
        self::QUERY_ITEM_PRICE,
        self::QUERY_ITEM_INV_QTY,
        self::QUERY_ITEM_POINT,
        self::QUERY_ITEM_DEL,
        self::QUERY_ITEM_RESTORE,
        self::QUERY_ITEM_ADD,
        self::QUERY_ITEM_EDIT
    );

    /**
     * 清理已经过期的query数据
     */
    public function clean() {
        $ystDay = date ( ‘Ymd‘, strtotime ( ‘-1 day‘ ) );
        $cacheKey = self::$KEY_CACHE_PREFIX . $ystDay;
        $cacheDealKey = self::$KEY_CACHE_DEAL_PREFIX . $ystDay;
        // 清理前尝试移动缓冲区里的残留消息到队列
        $this->moveToQuery ( $ystDay );
        // 删除过期缓冲区
        $this->redis->del ( $cacheKey );
        $this->redis->del ( $cacheDealKey );
    }

    public static function getQuerys() {
        return self::$querys;
    }

    /**
     * 将当前query切换到指定query
     *
     * @param string $query
     */
    public function selectQuery($query) {
        if (! in_array ( $query, self::$querys )) {
            $query = self::QUERY_DEFAULT;
        }
        $this->curQuery = $query;
        self::$KEY_CACHE_PREFIX = $query . ‘.cache‘;
        self::$KEY_QUERY_PREFIX = $query . ‘.lv‘;
        self::$KEY_CACHE_DEAL_PREFIX = $query . ‘.deal‘;
    }

    /**
     * 获取一个query实例
     * @param redis $redis
     * @param string $query
     * @return MsgQuery
     */
    public static function getInstance($redis, $query) {
        if (null == self::$instance) {
            self::$instance = new MsgQuery ( $redis, $query );
        }
        if (self::$instance->curQuery != $query) {
            self::$instance->selectQuery ( $query );
        }
        return self::$instance;
    }

    /**
     * 添加消息到消息缓冲区
     *
     * @param int $score
     *            优先级(1-5)
     * @param string $msg
     *            消息
     */
    public function add($score, $msg) {
        // 添加到消息缓冲
        $socre = intval ( $score );
        if ($socre < self::MIN_SCORE) {
            $score = self::MIN_SCORE;
        }
        if ($score > self::$MAX_SCORE) {
            $score = self::$MAX_SCORE;
        }
        $cacheKey = self::$KEY_CACHE_PREFIX . date ( ‘Ymd‘ );
        $cacheData = array (
                ‘score‘ => $score,
                ‘msg‘ => $msg
        );

        // 被添加到集合中的新元素的数量,不包括被忽略的元素,故重复添加返回false
        return $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
    }

    /**
     * 将消息从缓冲区移动到相应的优先级队列中
     */
    public function moveToQuery($day = null) {
        if ($day === null) {
            $day = date ( ‘Ymd‘ );
        }
        // 获取当前缓冲区没有入队列的消息
        $dealKey = self::$KEY_CACHE_DEAL_PREFIX . $day;
        $cacheKey = self::$KEY_CACHE_PREFIX . $day;
        $msgs = $this->redis->sDiff ( $cacheKey, $dealKey );
        foreach ( $msgs as $cachedData ) {
            // 放入已处理集合
            $this->redis->sAdd ( $dealKey, $cachedData );
            // 压入相应的优先级队列
            $cachedData = unserialize ( $cachedData );
            $score = $cachedData [‘score‘];
            $msg = $cachedData [‘msg‘];
            $queryKey = self::$KEY_QUERY_PREFIX . $score;
            $this->redis->rPush ( $queryKey, serialize($msg) );
        }
        unset ( $cachedData );
    }

    /**
     * 从队列阻塞式出栈一个最高优先级消息
     *
     * @return string msg
     */
    public function bPop() {
        $queryKeys = array ();
        for($score = self::$MAX_SCORE; $score >= self::MIN_SCORE; $score --) {
            $queryKeys [] = self::$KEY_QUERY_PREFIX . $score;
        }
        $msg = @$this->redis->blPop ( $queryKeys, 0 );
        return $msg [1];
    }

    private function __construct($redis, $query) {
        $this->redis = $redis;
//         $this->redis->connect ();
        self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
        $this->selectQuery ( $query );
    }

    public function __destruct() {
        if ($this->redis)     $this->redis->close ();
    }
}

?>

主体没有任何改变, 依然是2个set做缓冲, 多个list做实际队列的实现, 只是扩展为支持多个队列的实现.

其它队列使用中遇到过的问题:

1 feed程序因为长时间阻塞而断开了和redis的连接

解决方法: 加入代码

ini_set(‘default_socket_timeout‘, -1);

以上代码可以让php进程和redis通过socket长连接

2 feed程序因为长时间阻塞而断开了和mysql的连接

解决方法: 查看mysql驱动使用的是哪种, 我们项目中用的是mysqli连接, 具体驱动是mysqlnd, 如果是mysqld, 最简单的方式是先设置php变量mysqli.reconnect=1再加入ping命令

但有个问题是mysqlnd驱动下ping命令并不能自动重连, 即使改了php变量也没用, 只能显示进行重连, 如下

        if(!$this->connection->ping()){
            //mysql 连接丢失且mysqlnd不会自动重连, 手动重连
            $this->connectDB();
        }

3 重新发布/变更了feed程序可能需要重启进程加载新代码

最后附上一个队列分发和清理任务脚本, 脚本中多进程分发的代码存在问题, 会产生很多僵尸进程, 目前仅使用单进程版本, 处理时间为每天凌晨调用clean脚本清理前一天的队列, 分发程序为每5分钟执行一次, 具体执行间隔视情况调整

<?php
class ControllerMsgQuery extends Controller {

    /**
     * 处理队列缓冲区的数据, 移动到队列
     */
    public function index() {
        //多进程版本存在问题, 先使用单进程版本
        $this->dealWithSingleProcess();
    }

    private function dealWithMuitlProcess(){
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( ‘msg_query/MsgQuery‘ );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global[‘redis_cache_w‘][0][‘host‘], $global[‘redis_cache_w‘][0][‘port‘] );
        foreach ( $querys as $query ) {
            // 每个query用独立子线程完成move to list的工作
            $pid = pcntl_fork ();
            if ($pid > 0) {
                // 父进程
            } else if ($pid == 0) {
                // 子进程
                $MsgQuery = MsgQuery::getInstance ( $redis, $query );
                $MsgQuery->moveToQuery ();
                exit();
            } else {
                exit();
            }
        }
        // 父进程等待子进程以释放资源
        //         if ($pid > 0){
        pcntl_wait ( $status );
        //         }
    }

    private function dealWithSingleProcess(){
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( ‘msg_query/MsgQuery‘ );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global[‘redis_cache_w‘][0][‘host‘], $global[‘redis_cache_w‘][0][‘port‘] );
        foreach ( $querys as $query ) {
            $MsgQuery = MsgQuery::getInstance ( $redis, $query );
            $MsgQuery->moveToQuery ();
        }
    }

    /**
     * 清理已经过期的query数据
     */
    public function clean() {
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( ‘msg_query/MsgQuery‘ );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global[‘redis_cache_w‘][0][‘host‘], $global[‘redis_cache_w‘][0][‘port‘] );
        foreach ( $querys as $query ) {
            $MsgQuery = MsgQuery::getInstance ( $redis, $query );
            $MsgQuery->clean ();
        }
    }
}
时间: 2024-11-09 04:20:39

队列扩展, 支持多个队列的相关文章

Asp.net 面向接口可扩展框架之消息队列组件

消息队列对大多数人应该比较陌生.但是要提到MQ听说过的人会多很多.MQ就是英文单词"Message queue"的缩写,翻译成中文就是消息队列(我英语差,翻译错了请告知). PS:话说国人熟悉MQ比消息队列多,是不是因为国人的外语水平高于国语水平好几个数量级 1.看一下度娘怎么解释消息队列 参考链接:消息队列_百度百科 度娘解释消息队列是在两台计算机间传输的,套句很时髦的说法就是用来做分布式传输的,是个很高大上的东西 2.我的看法稍有不同 我更追溯到“消息队列”的字面“本源”的意思.我

ELKStack-使用消息队列扩展

ELKStack-使用消息队列扩展 官方文档:https://www.elastic.co/guide/en/logstash/5.x/deploying-and-scaling.html 流程图 流程:数据源 --> logstash(input收集.output消息队列) -->  MQ  -->  logstash (input收集消息队列.filter过滤.output ES) --> ES 使用这个流程,主要是基于性能考虑,第一层logstash主要做原始数据收集,不对数

linux内核对网卡驱动多队列的支持

linux的招牌就是它强大的网络功能,稳定,高效,能随着现实的日新月异而日趋完善.众所周知,linux的网卡由结构体net_device表示,一 个该结构体对应一个可以调度的数据包发送队列,注意,这里不谈数据包接收,数据包的实体在内核中以结构体sk_buff表示,这样的话,上述文字就可以用 以下图示来表示: 所谓的网卡对发送数据包的调度指的是多个数据包共享一个网卡的规则,当然就要拥有一系列的约定,比如区分数据包的优先级,区分数据包的类型,内核根据不同的调度策略来对不同的数据包进行排队,然后按照队

消息队列属性及常见消息队列介绍

什么是消息队列?消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个队列的消息可以同时被多个消息消费者消费.分布式消息服务DMS则是分布式的队列系统,消息队列中的消息分布存储,且每条消息存储多个副本,以实现高可用性,如下图所示. 一般来说,消息队列具有如下属性: 消息顺序普通队列支持"分区有序"和"全局队列"两种模式,ActiveMQ队列和Kafka队列均为分区有序. 分区有序的队列通过分布式处理,支持更高的并发,但由于队列的分布式特性,

C# Azure 消息队列ServiceBus (服务总线队列)

1. 前言 在阅读本文之前,可以查看微软官方的说明. https://www.azure.cn/documentation/articles/service-bus-create-queues/ 2. 介绍 1) service bus的队列,若当前消息被成功处理后,则这个消息会从队列中消失. 2) service bus是先进先出的队列,当取队列时,一直停留做等待,直到有消息进入. 3) Windows Azure Service Bus 提供安全且广泛可用的托管基础结构,以实现广泛通信.大范

bzoj1640[Usaco2007 Nov]Best Cow Line 队列变换*&amp;&amp;bzoj1692[Usaco2007 Dec]队列变换*

bzoj1640[Usaco2007 Nov]Best Cow Line 队列变换 bzoj1692[Usaco2007 Dec]队列变换 题意: 有一个奶牛队列.每次可以在原来队列的首端或是尾端牵出一头奶牛,把她安排到新队列的尾部,然后对剩余的奶牛队列重复以上的操作,直到所有奶牛都被插到了新的队列里.这样得到的队列,就是FJ拉去登记的最终的奶牛队列. 求对于给定的奶牛们的初始位置,计算出可能得到的字典序最小的队列.队列大小≤30000. 题解: 有一个结论:如果当前队列中的队首元素不等于队尾元

IOS多线程知识总结/队列概念/GCD/主队列/并行队列/全局队列/主队列/串行队列/同步任务/异步任务区别(附代码)

进程:正在进行中的程序被称为进程,负责程序运行的内存分配;每一个进程都有自己独立的虚拟内存空间 线程:线程是进程中一个独立的执行路径(控制单元);一个进程中至少包含一条线程,即主线程 队列 dispatch_queue_t,队列名称在调试时辅助,无论什么队列和任务,线程的创建和回收不需要程序员操作,有队列负责. 串行队列:队列中的任务只会顺序执行(类似跑步) dispatch_queue_t q = dispatch_queue_create(“....”, DISPATCH_QUEUE_SER

队列的应用:优先级队列

优先级队列:如果我们给每个元素都分配一个数字来标记其优先级,不妨设较小的数字具有较高的优先级,这样我们就可以在一个集合中访问优先级最高的元素并对其进行查找和删除操作了. 优先级队列(priority queue)是0个或多个元素的集合,每个元素都有一个优先权,对优先级队列执行的操作有(1)查找(2)插入一个新元素(3)删除 一般情况下,查找操作用来搜索优先权最大的元素,删除操作用来删除该元素 .对于优先权相同的元素,可按先进先出次序处理或按任意优先权进行. 以上是网上常见的对优先级队列的描述.偷

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把