很久以前写的一个功能,当时需要一个优先级的队列,特用新学的swoole写了一个简单的demo,仅满足当时的需求。
功能说明:
- 完全参考httpsqs增加优先级参数level
例:
http://192.168.8.18:5555/?name=t ... a=testdata1&level=1
http://192.168.8.18:5555/?name=t ... a=testdata2&level=1
http://192.168.8.18:5555/?name=t ... a=testdata3&level=3
此时,循环调用get方法,将获取数据testdata3,testdata1,testdata2
注:get时先弹出优先级高的数据;同等优先级数据,按put的先后弹出;
用到的工具:
PHP PHP的swoole扩展
思路:
- 使用swoole扩展监听端口,接收用户传入的参数;用全局变量$info存储所有队列名、每个队列的状态信息(所有优先级、每个优先级中已读数据个数,未读数据个数)等;设置定时器,定时将info写入文件;实际数据存入文件,每个数据占一行,使用fgets按行读出;全局变量$fp保存所有打开的文件句柄,减少文件的打开次数;
优势:
使用世界上最好的语言PHP开发!
性能测试:
1:写性能:
ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test\&opt=put\&data=testdata\&level=3
2:读性能
ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test\&opt=get
附:基础代码
<?php //带优化级的http队列服务 error_reporting(E_ERROR | E_WARNING | E_PARSE); date_default_timezone_set(‘PRC‘); $dirname = "./data/queue_"; //文件存储位置及文件名 $infodirname = "./data/info"; //存储基本信息的文件 $maxfilesize = 1024 * 1024 * 1024;//文件最大1G $fp = array(); //打开文件的句柄 $info = array(); //存储各个队列的基本信息,get/put的位置 $infoflag = 0; //标记info是否被修改过 $tmp = file_get_contents($infodirname);//初始化,从文件中载入基本信息 !empty($tmp) && $info = unserialize($tmp); $http = new swoole_http_server("0.0.0.0", 5555); $http->set(array( ‘worker_num‘ => 1, //工作进程数量 )); $http->on(‘WorkerStart‘, function($http) { $http->addtimer(10000);//增加定时器 }); $http->on(‘Timer‘, ‘cronWriteInfo‘); //定时将基本信息写入文件 $http->on(‘request‘, "mycallback"); $http->start(); function cronWriteInfo() { //定时写文件 global $info,$infoflag,$infodirname; if($infoflag) { //info被修改过时,才将之写入文件 file_put_contents($infodirname, serialize($info)); } } function mycallback($request, $response) { global $info, $infoflag; if($request->server[‘request_uri‘] == "/favicon.ico") { //过滤掉浏览器的icon请求 $response->end(" \n"); return ‘‘; } $infoflag = 1; $param = $request->get; if(empty($param[‘name‘])) {//队列名字不能为空 $ret = "QUEUE_ERROR_QUEUENAME_EMPTY"; } else { switch ($param[‘opt‘]) { case ‘get‘: $ret = get($param[‘name‘]); break; case ‘put‘: if(($data = $param[‘data‘]) && !empty($param[‘data‘])) { empty($param[‘level‘]) && $param[‘level‘] = 0; $ret = put($param[‘name‘], $param[‘data‘], $param[‘level‘]); } else { $ret = "QUEUE_PUT_ERROR"; } break; case ‘status‘: $ret = json_encode($info); break; default: $ret = "QUEUE_ERROR_PARAM"; break; } } $response->end($ret."\n"); } function get($queuename) { global $info; if($info[$queuename][‘unread‘] > 0) { $maxlevel = $info[$queuename][‘curMaxlevel‘]; //当前最大优化级 $pos = $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘]; //文件偏移量 $data = getdataFromFile($queuename, $maxlevel, $pos); //读取数据 $ret = pack("H*", $data); //data是压缩的数据,需要解压 //修改全局变量 $info[$queuename][‘get‘]++; $info[$queuename][‘unread‘]--; $info[$queuename][‘leveldata‘][$maxlevel][‘unread‘]--; if($ret == "QUEUE_END") { //读到的数据不全,原因:数据文件大小超过最大值设置; $data = getdataFromFile($queuename, $maxlevel, 0); //从头再读一次 $ret = pack("H*", $data); //data是压缩的数据,需要解压 $info[$queuename][‘leveldata‘][$maxlevel][‘get‘] = 1;//读取位置设为1 $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘] = strlen($data) + 1; } else { $info[$queuename][‘leveldata‘][$maxlevel][‘get‘]++; $info[$queuename][‘leveldata‘][$maxlevel][‘getpos‘] += strlen($data) + 1 ; } if($info[$queuename][‘leveldata‘][$maxlevel][‘get‘] == $info[$queuename][‘leveldata‘][$maxlevel][‘put‘]) { //数据读取完毕,重新计算当前最高优先级的数据 $info[$queuename][‘curMaxlevel‘] = getcurMaxlevel($info[$queuename][‘leveldata‘]); } return $ret; } else { return ‘QUEUE_EMPTY‘; } } function put($queuename, $userdata, $level) { global $info; if($info[$queuename][‘leveldata‘][$level][‘put‘]+1 == $info[$queuename][‘leveldata‘][$level][‘get‘]) { //队列写满 return "QUEUE_FULL"; } $info[$queuename][‘put‘]++; $info[$queuename][‘unread‘]++; $newpos = setDataToFile($queuename, $level, $userdata); $info[$queuename][‘leveldata‘][$level][‘put‘]++; $info[$queuename][‘leveldata‘][$level][‘unread‘]++; $info[$queuename][‘leveldata‘][$level][‘putpos‘] = $newpos; $info[$queuename][‘curMaxlevel‘] = getcurMaxlevel($info[$queuename][‘leveldata‘]); return "QUEUE_PUT_OK"; } //获取当前需要执行的优先级最高的数据 function getcurMaxlevel($leveldata) { krsort($leveldata); foreach($leveldata as $level => $info) { if($info[‘unread‘] > 0) { return $level; } } return 0; } //写数据到文件,如果文件写满,循环写入; function setDataToFile($queuename, $level, $userdata) { global $dirname, $maxfilesize, $fp; $file = $dirname.$queuename.$level; $packdata = unpack("H*", $userdata); //$$packdata[‘1‘] = $userdata; $data = $packdata[‘1‘]."\n"; $size = filesize($file); if(!$fp[$queuename][$level]) { $fp = fopen($file, "ab+"); } fseek($fp, $size); fwrite($fp, $data); clearstatcache(); if($size + strlen($data) + 1 > $maxfilesize) { //超出文件最大值,进入下一轮 fseek($fp, $size); //倒回指针 fwrite($fp, "QUEUE_END"); //写入标记符号 fseek($fp,0);//重头写 fwrite($fp, $data); return strlen($data); } else { return $size + strlen($data); } } //从文件中读取数据 function getdataFromFile($queuename, $level, $pos) { global $dirname,$fp; if(!isset($fp[$queuename][$level])) { $file = $dirname.$queuename.$level; $fp = fopen($file, "ab+"); } fseek($fp, intval($pos)); return trim(fgets($fp)); }
时间: 2024-10-08 13:53:41