因为PHP-Resque 的重试需要自己写,网上又没啥轮子,而且resque也很久不更新了,所以自己研究下resque的源码,然后也借鉴了Laravel的队列重试机制,实现了PHP-Resque的重试机制。
设计思路
阅读resque源码,我们知道,resque把失败的队列的数据都放在了resque:failed列表,数据如下。
我的项目是在yii2下统一处理resque的,
我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,
一开始创建的时候attempts为0,重试之后attempts每次加1然后进行retry,直到到达尝试数次才停止重试。
核心代码
1.调用重试的代码
/**
* Retry Job
*/
public function actionRetry()
{
$redis = new Redis(‘redis‘);
while (true) {
$json = $redis->rPop(‘resque:failed‘);
$array = json_decode($json, true);
if ($array) {
$jobArray = $array[‘payload‘];
if ($jobArray[‘attempts‘] < $this->retryTimes) {
//retry
\Resque::retry($jobArray, $array[‘queue‘]);
echo "Queued job " . $jobArray[‘id‘] . ‘ has retry!‘ . "\n";
} else {
//stop retry
$redis->lPush(‘resque:failed‘, [$json]);
}
}
//take a sleep
echo "*** Sleeping for ".$this->sleep. "\n";
sleep($this->sleep);
}
return true;
}
这里可以弄成守护进程一直处理失败的队列任务。
2./php-resque/lib/Resque.php
/**
* retry job and save it to the specified queue.
*
* @param array $jobArray The attempts of the the job .
* array(
* ‘id‘=> The id of the job
* ‘class‘ => The name of the class that contains the code to execute the job.
* ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
* ‘attempts‘=> The retry attempts of the job
* )
* @param string $queue The name of the queue to place the job in.
*
* @return boolean
*/
public static function retry($jobArray,$queue)
{
require_once dirname(__FILE__) . ‘/Resque/Job.php‘;
$result = Resque_Job::retry($jobArray,$queue);
if ($result) {
Resque_Event::trigger(‘afterEnqueue‘, array(
‘class‘ => $jobArray[‘class‘],
‘args‘ => $jobArray[‘args‘],
‘queue‘ => $queue,
));
}
return true;
}
3./php-resque/lib/Resque/Job.php
/**
* retry job and save it to the specified queue.
* *
* @param array $jobArray The data of the job.
* array(
* ‘id‘=> The id of the job
* ‘class‘ => The name of the class that contains the code to execute the job.
* ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
* ‘attempts‘=> The retry attempts of the job
* )
* @param string $queue The name of the queue to place the job in.
*
* @return string
*/
public static function retry($jobArray,$queue)
{
$args = $jobArray[‘args‘];
if($args !== null && !is_array($args)) {
throw new InvalidArgumentException(
‘Supplied $args must be an array.‘
);
}
$jobArray[‘attempts‘]++;
Resque::push($queue, array(
‘class‘ => $jobArray[‘class‘],
‘args‘ => array($args),
‘id‘ => $jobArray[‘id‘],
‘attempts‘=>$jobArray[‘attempts‘]
));
return true;
}
结果:
我这里我设置了重试次数为3次,每隔5秒处理一个队列。
原文地址:http://blog.51cto.com/onebig/2162057
时间: 2024-11-05 13:29:12