带优先级的队列 - PHP实现

很久以前写的一个功能,当时需要一个优先级的队列,特用新学的swoole写了一个简单的demo,仅满足当时的需求。

功能说明:

    完全参考httpsqs增加优先级参数level

例:
            http://192.168.8.18:5555/?name=t ... a=testdata1&level=1
            http://192.168.8.18:5555/?name=t ... a=testdata2&level=1
            http://192.168.8.18:5555/?name=t ... a=testdata3&level=3
           此时,循环调用get方法,将获取数据testdata3,testdata1,testdata2
           注:get时先弹出优先级高的数据;同等优先级数据,按put的先后弹出;

用到的工具:

PHP PHP的swoole扩展

思路:

    使用swoole扩展监听端口,接收用户传入的参数;用全局变量$info存储所有队列名、每个队列的状态信息(所有优先级、每个优先级中已读数据个数,未读数据个数)等;设置定时器,定时将info写入文件;实际数据存入文件,每个数据占一行,使用fgets按行读出;全局变量$fp保存所有打开的文件句柄,减少文件的打开次数;

优势:
        使用世界上最好的语言PHP开发!

性能测试:
1:写性能:

ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test\&opt=put\&data=testdata\&level=3

  

2:读性能

ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test\&opt=get

附:基础代码

<?php
//带优化级的http队列服务
error_reporting(E_ERROR | E_WARNING | E_PARSE);
date_default_timezone_set(‘PRC‘);
$dirname = "./data/queue_"; //文件存储位置及文件名
$infodirname = "./data/info"; //存储基本信息的文件
$maxfilesize = 1024 * 1024 * 1024;//文件最大1G
$fp = array(); //打开文件的句柄
$info = array(); //存储各个队列的基本信息,get/put的位置
$infoflag = 0; //标记info是否被修改过
$tmp = file_get_contents($infodirname);//初始化,从文件中载入基本信息
!empty($tmp) && $info = unserialize($tmp);

$http = new swoole_http_server("0.0.0.0", 5555);
$http->set(array(
    ‘worker_num‘ => 1,   //工作进程数量
));
$http->on(‘WorkerStart‘, function($http) {
    $http->addtimer(10000);//增加定时器
});
$http->on(‘Timer‘, ‘cronWriteInfo‘); //定时将基本信息写入文件

$http->on(‘request‘, "mycallback");
$http->start();

function cronWriteInfo() { //定时写文件
    global $info,$infoflag,$infodirname;
    if($infoflag) { //info被修改过时,才将之写入文件
        file_put_contents($infodirname, serialize($info));
    }
}
function mycallback($request, $response) {
    global $info, $infoflag;
    if($request->server[‘request_uri‘] == "/favicon.ico") { //过滤掉浏览器的icon请求
        $response->end(" \n");
        return ‘‘;
    }
    $infoflag = 1;
    $param = $request->get;
    if(empty($param[‘name‘])) {//队列名字不能为空
        $ret = "QUEUE_ERROR_QUEUENAME_EMPTY";
    } else {
        switch ($param[‘opt‘]) {
            case ‘get‘:
                $ret = get($param[‘name‘]);
                break;
            case ‘put‘:
                if(($data = $param[‘data‘]) && !empty($param[‘data‘])) {
                    empty($param[‘level‘]) && $param[‘level‘] = 0;
                    $ret = put($param[‘name‘], $param[‘data‘], $param[‘level‘]);
                } else {
                    $ret = "QUEUE_PUT_ERROR";
                }
                break;
            case ‘status‘:
                $ret = json_encode($info);
                break;
            default:
                $ret = "QUEUE_ERROR_PARAM";
                break;
        }
    }
    $response->end($ret."\n");
}
function get($queuename) {
    global $info;
    if($info[$queuename][‘unread‘] > 0) {
        $maxlevel = $info[$queuename][‘curMaxlevel‘]; //当前最大优化级
        $pos = $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘]; //文件偏移量
        $data =  getdataFromFile($queuename, $maxlevel, $pos); //读取数据
        $ret = pack("H*", $data); //data是压缩的数据,需要解压
        //修改全局变量
        $info[$queuename][‘get‘]++;
        $info[$queuename][‘unread‘]--;
        $info[$queuename][‘leveldata‘][$maxlevel][‘unread‘]--;
        if($ret == "QUEUE_END") { //读到的数据不全,原因:数据文件大小超过最大值设置;
            $data = getdataFromFile($queuename, $maxlevel, 0); //从头再读一次
            $ret = pack("H*", $data); //data是压缩的数据,需要解压
            $info[$queuename][‘leveldata‘][$maxlevel][‘get‘] = 1;//读取位置设为1
            $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘] = strlen($data) + 1;
        } else {
            $info[$queuename][‘leveldata‘][$maxlevel][‘get‘]++;
            $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘] += strlen($data) + 1 ;
        }
        if($info[$queuename][‘leveldata‘][$maxlevel][‘get‘] == $info[$queuename][‘leveldata‘][$maxlevel][‘put‘]) { //数据读取完毕,重新计算当前最高优先级的数据
            $info[$queuename][‘curMaxlevel‘] = getcurMaxlevel($info[$queuename][‘leveldata‘]);
        }
        return $ret;
    } else {
        return ‘QUEUE_EMPTY‘;
    }
}
function put($queuename, $userdata, $level) {
    global $info;
    if($info[$queuename][‘leveldata‘][$level][‘put‘]+1 == $info[$queuename][‘leveldata‘][$level][‘get‘]) { //队列写满
        return "QUEUE_FULL";
    }
    $info[$queuename][‘put‘]++;
    $info[$queuename][‘unread‘]++;
    $newpos = setDataToFile($queuename, $level, $userdata);
    $info[$queuename][‘leveldata‘][$level][‘put‘]++;
    $info[$queuename][‘leveldata‘][$level][‘unread‘]++;
    $info[$queuename][‘leveldata‘][$level][‘putpos‘] = $newpos;
    $info[$queuename][‘curMaxlevel‘] = getcurMaxlevel($info[$queuename][‘leveldata‘]);
    return "QUEUE_PUT_OK";
}

//获取当前需要执行的优先级最高的数据
function getcurMaxlevel($leveldata) {
    krsort($leveldata);
    foreach($leveldata as $level => $info) {
        if($info[‘unread‘] > 0) {
            return $level;
        }
    }
    return 0;
}

//写数据到文件,如果文件写满,循环写入;
function setDataToFile($queuename, $level, $userdata) {
    global $dirname, $maxfilesize, $fp;
    $file = $dirname.$queuename.$level;
    $packdata = unpack("H*", $userdata);
    //$$packdata[‘1‘] = $userdata;
    $data = $packdata[‘1‘]."\n";
    $size = filesize($file);
    if(!$fp[$queuename][$level]) {
        $fp = fopen($file, "ab+");
    }
    fseek($fp, $size);
    fwrite($fp, $data);
    clearstatcache();
    if($size + strlen($data) + 1 > $maxfilesize) { //超出文件最大值,进入下一轮
        fseek($fp, $size); //倒回指针
        fwrite($fp, "QUEUE_END"); //写入标记符号
        fseek($fp,0);//重头写
        fwrite($fp, $data);
        return strlen($data);
    } else {
        return $size + strlen($data);
    }
}
//从文件中读取数据
function getdataFromFile($queuename, $level, $pos) {
    global $dirname,$fp;
    if(!isset($fp[$queuename][$level])) {
        $file = $dirname.$queuename.$level;
        $fp = fopen($file, "ab+");
    }
    fseek($fp, intval($pos));
    return trim(fgets($fp));
}
时间: 2024-10-08 13:53:41

带优先级的队列 - PHP实现的相关文章

利用redis实现带优先级的消息队列

前言 以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结. 1. 利用Sorted Set 实现 使用Sorted Set 做优先级队列最大的优点是直观明了. ZADD key score member [[score member] [score member] ...] score 作为优先级,member 作为相应的任务 在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高 由

用STL设计消息队列、优先级消息队列、资源分配管理器

STL库老早已经成为C++的一部分,在使用C++开发项目的过程中,很多人还在犹豫要不要使用STL库,觉得STL库很难,其实不然.我工作的项目中现在大量使用STL库,STL使用调试简单,高效,可以减少很多重复的代码. 本文的主要目的是使用STL的queue 和 priority queue来阐述下项目中经常使用的消息队列以及资源分配模式.本文的例子主要如下: 消息队列 带优先级的消息队列 资源分配管理器 STL容器 我们将使用下面的容器来实现本文的例子: queue 队列容器支持添加一个元素,并且

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

指定作业提交的优先级和队列

hadoop jar /home/ochadoop/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.3.0-cdh5.0.2.jar pi -Dmapreduce.job.queuename=ochadoop 50 100 作业提交到的队列:mapreduce.job.queuename 作业优先级:mapreduce.job.priority Pig版本: SET mapreduce.job.queuename roo

poj 2828 buy Tickets 用线段树模拟带插入的队列

Buy Tickets Time Limit: 1 Sec  Memory Limit: 256 MB 题目连接 http://acm.hdu.edu.cn/showproblem.php?pid=2795 Description Railway tickets were difficult to buy around the Lunar New Year in China, so we must get up early and join a long queue… The Lunar New

[py]py2自带Queue模块实现了3类队列

py2自带Queue实现了3类队列 先搞清楚几个单词 Queue模块实现了三类队列: FIFO(First In First Out,先进先出,默认为该队列), 我们平时泛指的队列, LIFO(Last In First Out,后进先出) 基于优先级的队列.以下为其常用方法: 队列一般有 push pop size empty等 先进先出 q = Queue.Queue(maxsize) 后进先出 a = Queue.LifoQueue(maxsize) 优先级 Queue.PriorityQ

Java/Android中的优先级任务队列的实践

版权声明:转载必须注明本文转自严振杰的博客:http://blog.yanzhenjie.com 刚刚把公司的活干完,去群里水,有几个小伙伴问我怎么实现队列,于是乎我来写一篇吧.本篇文章适用于Java和Android开发者,会从实现一个最简单的队列过渡到实现一个带有优先级的队列,保准你可以掌握基本的队列原理. 队列的基本理解 用生活中的一个情景来举个栗子,前段时间很火爆的电视剧<人民的名义>中有一个丁义珍式的窗口大家应该都知道了,我们不说<人民的名义>也不说丁义珍,我们来说说这个办

java中使用队列:java.util.Queue (转)

Queue接口与List.Set同一级别,都是继承了Collection接口.LinkedList实现了Queue接 口.Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时,就完全只能访问Queue接口所定义的方法 了,而不能直接访问 LinkedList的非Queue的方法),以使得只有恰当的方法才可以使用.BlockingQueue 继承了Queue接口. 队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就

java中队列Queue的使用

1.在java5中新增加了java.util.Queue接口,用以支持队列的常见操作.Queue接口与List.Set同一级别,都是继承了Collection接口.Queue使用时要尽量避免Collection的add()和remove()方法,而是要使用offer()来加入元素,使用poll()来获取并移出元素.它们的优点是通过返回值可以判断成功与否,add()和remove()方法在失败的时候会抛出异常. 如果要使用前端而不移出该元素,使用element()或者peek()方法. 2.值得注