PHP-RESQUE重试机制

因为PHP-Resque 的重试需要自己写,网上又没啥轮子,而且resque也很久不更新了,所以自己研究下resque的源码,然后也借鉴了Laravel的队列重试机制,实现了PHP-Resque的重试机制。

设计思路

阅读resque源码,我们知道,resque把失败的队列的数据都放在了resque:failed列表,数据如下。

我的项目是在yii2下统一处理resque的,
我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,
一开始创建的时候attempts为0,重试之后attempts每次加1然后进行retry,直到到达尝试数次才停止重试。

核心代码

1.调用重试的代码

    /**
     * Retry Job
     */
    public function actionRetry()
    {
        $redis = new Redis(‘redis‘);
        while (true) {
            $json = $redis->rPop(‘resque:failed‘);
            $array = json_decode($json, true);
            if ($array) {
                $jobArray = $array[‘payload‘];
                if ($jobArray[‘attempts‘] < $this->retryTimes) {
                    //retry
                    \Resque::retry($jobArray, $array[‘queue‘]);
                    echo "Queued job " . $jobArray[‘id‘] . ‘ has retry!‘ . "\n";
                } else {
                    //stop retry
                    $redis->lPush(‘resque:failed‘, [$json]);
                }
            }
            //take a sleep
            echo "*** Sleeping for ".$this->sleep. "\n";
            sleep($this->sleep);
        }

        return true;
    }
这里可以弄成守护进程一直处理失败的队列任务。

2./php-resque/lib/Resque.php
/**
     * retry job and save it to the specified queue.
     *
     * @param array $jobArray The attempts of the the job .
     * array(
     *  ‘id‘=> The id of the job
     *  ‘class‘ => The name of the class that contains the code to execute the job.
     *  ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
     *  ‘attempts‘=> The retry attempts of the job
     * )
     * @param string $queue The name of the queue to place the job in.
     *
     * @return boolean
     */
    public static function retry($jobArray,$queue)
    {
        require_once dirname(__FILE__) . ‘/Resque/Job.php‘;
        $result = Resque_Job::retry($jobArray,$queue);
        if ($result) {
            Resque_Event::trigger(‘afterEnqueue‘, array(
                ‘class‘ => $jobArray[‘class‘],
                ‘args‘  => $jobArray[‘args‘],
                ‘queue‘ => $queue,
            ));
        }

        return true;
    }
3./php-resque/lib/Resque/Job.php
/**
     * retry  job and save it to the specified queue.
     * *
     * @param array $jobArray The data of the job.
     * array(
     *  ‘id‘=> The id of the job
     *  ‘class‘ => The name of the class that contains the code to execute the job.
     *  ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
     *  ‘attempts‘=> The retry attempts of the job
     * )
     * @param string $queue The name of the queue to place the job in.
     *
     * @return string
     */
    public static function retry($jobArray,$queue)
    {
        $args = $jobArray[‘args‘];
        if($args !== null && !is_array($args)) {
            throw new InvalidArgumentException(
                ‘Supplied $args must be an array.‘
            );
        }
        $jobArray[‘attempts‘]++;
        Resque::push($queue, array(
            ‘class‘ => $jobArray[‘class‘],
            ‘args‘  => array($args),
            ‘id‘    => $jobArray[‘id‘],
            ‘attempts‘=>$jobArray[‘attempts‘]
        ));
        return true;
    }

结果:

我这里我设置了重试次数为3次,每隔5秒处理一个队列。

原文地址:http://blog.51cto.com/onebig/2162057

时间: 2024-11-05 13:29:12

PHP-RESQUE重试机制的相关文章

Volley超时重试机制详解

Volley超时重试机制 基础用法 Volley为开发者提供了可配置的超时重试机制,我们在使用时只需要为我们的Request设置自定义的RetryPolicy即可. 参考设置代码如下: int DEFAULT_TIMEOUT_MS = 10000; int DEFAULT_MAX_RETRIES = 3; StringRequest stringRequest = new StringRequest(Request.Method.GET, url, new Response.Listener<S

Rocket重试机制,消息模式,刷盘方式

一.Consumer 批量消费(推模式) 可以通过 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 这里需要分为2种情况 Consumer端先启动 Consumer端后启动.   正常情况下:应该是Consumer需要先启动 注意:如果broker采用推模式的话,consumer先启动,会一条一条消息的消费,consumer后启动会才用批量消费 Consumer端先启动 1.Consumer.java package quickstart

jedis超时重试机制注意事项

最近使用redis集群进行incr操作,总是发现计数不准确,后来经过检查发现redis在执行incr超时会执行重试机制,造成计数不准确,测试代码: /** * incrf: * 将 key 中储存的数字值增一. 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作. 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误. 本操作的值限制在 64 位(bit)有符号数字表示之内. 这是一个针对字符串的操作,因为 Redis 没有专用的整数类型,

为你的代码加上一层重试机制

为代码加上重试机制 1.前言:对于经常跟网络编程打交道的你来说,并不是你的每次Request,Server都会给你想要的Response.重试机制虽然并不能解决这种情况,但是却可以大大减少这种情况的发生. 2.介绍下重试机制类:RetryUtil.cs 使用了委托,代码很短,也不难理解. 1 public class RetryUtil 2 { 3 public delegate void NoArgumentHandler(); 4 /// <summary> 5 /// retry mec

guava的重试机制guava-retrying使用

1,添加maven依赖 <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>2.0.0</version> </dependency> 2,定义重试机制 Retryer<CMSResultDTO> smsRetryer = RetryerBuilder.

SpringCloud | FeignClient和Ribbon重试机制区别与联系

在spring cloud体系项目中,引入的重试机制保证了高可用的同时,也会带来一些其它的问题,如幂等操作或一些没必要的重试. 今天就来分别分析一下 FeignClient 和 Ribbon 重试机制的实现原理和区别,主要分为三点: 1)FeignClient重试机制分析 2)Ribbon重试机制分析 3)FeignClient和Ribbon重试机制的区别于联系 1)FeignClient 重试机制分析: FeignClient 重试机制的实现原理相对简单.首先看一下feignClient处理请

【Dubbo 源码解析】07_Dubbo 重试机制

Dubbo 重试机制 通过前面 Dubbo 服务发现&引用 的分析,我们知道,Dubbo 的重试机制是通过 com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker 来实现的: public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcExce

nginx的重试机制以及nginx常用的超时配置说明

nginx的重试机制 现在对外服务的网站,很少只使用一个服务节点,而是部署多台服务器,上层通过一定机制保证容错和负载均衡. nginx就是常用的一种HTTP和反向代理服务器,支持容错和负载均衡. nginx的重试机制就是容错的一种. 在nginx的配置文件中,proxy_next_upstream项定义了什么情况下进行重试,官网文档中给出的说明如下:--------------------- Syntax: proxy_next_upstream error | timeout | invali

HBase客户端Rpc的重试机制以及客户端参数优化。

hbase客户端重试机制如何保证系统的容错性和低延迟性HBase客户端Rpc的重试机制以及客户端参数优化.HBase客户端基于退避算法的重试机制1.业务用户一方面比较关注HBase本身服务的读写性能:吞吐量以及读写延迟,2.另一方面也会比较关注HBase客户端使用上的问题,主要集中在两个方面:是否提供了重试机制来保证系统操作的容错性?是否有必要的超时机制保证系统能够fastfail,保证系统的低延迟特性?3.HBase客户端提供的重试机制,并通过配置合理的参数使得客户端在保证一定容错性的同时还能