SpringCloud | FeignClient和Ribbon重试机制区别与联系

在spring cloud体系项目中,引入的重试机制保证了高可用的同时,也会带来一些其它的问题,如幂等操作或一些没必要的重试。
今天就来分别分析一下 FeignClient 和 Ribbon 重试机制的实现原理和区别,主要分为三点:

1)FeignClient重试机制分析
2)Ribbon重试机制分析
3)FeignClient和Ribbon重试机制的区别于联系

1)FeignClient 重试机制分析:

FeignClient 重试机制的实现原理相对简单。首先看一下feignClient处理请求的拦截类:SynchronousMethodHandler,看一下该类中的代理方法invoke

 @Override
  public Object invoke(Object[] argv) throws Throwable {
  //生成处理请求模板
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    //获取重试配置类
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
      //在异常里执行是否重试方法
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

上面的默认重试配置Retryer,在其构造方法中,默认的请求次数为5次,如下:

 public Default() {
      this(100, SECONDS.toMillis(1), 5);
    }
  • 1
  • 2
  • 3
  • 4

判断是否重试的算法如下:

public void continueOrPropagate(RetryableException e) {
//重试次数大于最大请求次数,抛出异常
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
      }
      sleptForMillis += interval;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

如果要关闭或者要重写 feignClient重试机制 的话,可以自定义feignRetryer,在方法中不做重试,直接抛出异常。配置如下:

/**
 * @author zhangshukang
 */
@Configuration
public class FeignConfig {
    @Bean
    Retryer feignRetryer() {
        return new Retryer() {
            @Override
            //在这里重写 continueOrPropagate算法,可自定义处理方式。这里直接抛出异常,相当于不重试。
            public void continueOrPropagate(RetryableException e) {
                throw e;
            }
            @Override
            public Retryer clone() {
                return this;
            }
        };
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2)Ribbon重试机制分析:

首先看一下我们ribbon常用的配置,已经配置用到的地方:

ribbon:
  ReadTimeout: 0
  ConnectTimeout: 10
  MaxAutoRetries: 1
  MaxAutoRetriesNextServer: 2
  OkToRetryOnAllOperations: false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这里从字面意思可以看出:
retrySameServer:重试相同实例,对应MaxAutoRetries
retryNextServer:重试下一实例,对应MaxAutoRetriesNextServer
retryEnabled:重试所有操作,对应OkToRetryOnAllOperations

这里声明一点,关于feignClient如何整合ribbon负载均衡的,之前的博客已经有完整的分析:
《SpringCloud | Feign如何整合Ribbon进行负载均衡的?》,所以下面就跳过整合部分,直接分析负载均衡模块。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        //获取重试机制配置:RequestSpecificRetryHandler,继续跟进该方法...
        RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
        //这里很关键,很明显采用了命令模式,ribbon负载均衡的配置在这里传给LoadBalancerCommand类
        LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
                .withLoadBalancerContext(this)
                .withRetryHandler(handler)
                .withLoadBalancerURI(request.getUri())
                .build();

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        }
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
            RibbonRequest request, IClientConfig requestConfig) {
            //这里如果配置了OkToRetryOnAllOperations为true,则所有的请求都进行重试。默认为false
        if (this.clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations,
                false)) {
            return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
                    requestConfig);
        }
        //如果没配置的话,如果不是get请求,就关闭重试
        if (!request.toRequest().method().equals("GET")) {
            return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
                    requestConfig);
        }
        else {
        //如果是get请求,则开启重试。
            return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
                    requestConfig);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

上述代码是对请求类型进行区分,哪些重试,哪些不重试。
区别就在于第二个参数,来看一下第二个参数具体哪里用到了,继续跟进代码如下:

    public boolean isRetriableException(Throwable e, boolean sameServer) {
    //如果手动配置了所有请求都重试,或者get请求时,这里开启重试。
        if(this.okToRetryOnAllErrors) {
            return true;
        } else if(e instanceof ClientException) {
            ClientException ce = (ClientException)e;
            return ce.getErrorType() == ErrorType.SERVER_THROTTLED?!sameServer:false;
        } else {
            return this.okToRetryOnConnectErrors && this.isConnectionException(e);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

刚刚上面提到了命令模式,属于RxJava的内容,事件驱动机制,有兴趣的可以自行研读。这里看一下上面命令模式执行类具体怎么用的:

public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //这两个变量,上面已经提到了,重试机制的关键
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // 利用RxJava生成一个Observable用于后面的回调
    Observable<T> o =
            //选择具体的server进行调用
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    //获取这个server调用监控记录,用于各种统计和LoadBalanceRule的筛选server处理
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //获取本次server调用的回调入口,用于重试同一实例的重试回调
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);

                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

                                ......省略部分代码

                                }
                            });
                    //设置针对同一实例的重试回调
                    if (maxRetrysSame > 0)
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    //设置重试下一个实例的回调
    if (maxRetrysNext > 0 && server == null)
        o = o.retry(retryPolicy(maxRetrysNext, false));
    //异常回调
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

上述代码典型的RxJava风格。

接下来是关键。o为Observable实例,类似于生产者,上面代码为Observable回调逻辑。上面有两行关键的代码:
o = o.retry(retryPolicy(maxRetrysSame, true));
o = o.retry(retryPolicy(maxRetrysNext, false));

首先看一下 retryPolicy 方法,这个就是 ribbon 重试算法的逻辑了,来看一下的实现:

 private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
        return new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer tryCount, Throwable e) {
                if (e instanceof AbortExecutionException) {
                    return false;
                }
                //判断是否继续重试
                if (tryCount > maxRetrys) {
                    return false;
                }

                if (e.getCause() != null && e instanceof RuntimeException) {
                    e = e.getCause();
                }
                //进入异常处理
                return retryHandler.isRetriableException(e, same);
            }
        };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

上述代码是Ribbon判断是否重试的实现,根据我们配置的变量次数,进行判断,有异常则进入异常处理。
整体的重试机制就是将 LoadBalancerCommand 类中 retryPolicy 的重试实现逻辑,传入RxJava Observable对象的o.retry()方法,该方法接收的参数的就是一个Function:

public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
        return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
    }
  • 1
  • 2
  • 3

最后回过头看这两行代码,逻辑大致清晰许多,来看一下执行顺序:

o = o.retry(retryPolicy(maxRetrysSame, true));
o = o.retry(retryPolicy(maxRetrysNext, false));
  • 1
  • 2

执行顺序:

1)首先会先执行下面一行代码,获取负载均衡的重试配置,然后进行负载均衡,选取实例。

2)再执行上面一行代码,获取执行单个服务的重试配置,最后再执行具体的业务逻辑。

3)FeignClient 和 Ribbon重试区别与联系:

疑问:一个http请求,如果feign和ribbon都配置了重试机制,异常情况下一共会请求多少次?

经过上面的分析,请求总次数 n 为feignClient和ribbon配置参数的笛卡尔积:

n(请求总次数)=feign(默认5次) * (MaxAutoRetries+1) * (MaxAutoRetriesNextServer+1)

注意:+1是代表ribbon本身默认的请求。

其实二者的重试机制相互独立,并无联系。但是因为用了feign肯定会用到ribbon,所以feign的重试机制相对来说比较鸡肋,自己feignClient的时候一般会关闭该功能。ribbon的重试机制默认配置为0,也就是默认是去除重试机制的,建议不要修改。如果配置不当,会因为幂等请求带来数据问题。所以建议关闭二者的重试功能。
如果开启的话,建议合理配置Hystrix的超时时间,在一些没必要的重试请求执行时,根据Hystrix的超时时间,快速失败,结束重试。

友链:探果网

原文地址:https://www.cnblogs.com/tiancai/p/9621800.html

时间: 2024-08-03 10:50:22

SpringCloud | FeignClient和Ribbon重试机制区别与联系的相关文章

SpringCloud微服务的熔断机制以及相关概念介绍

1.什么是服务的熔断机制? 熔断机制是对系统的防护,比如受到一些恶意攻击,那么需要熔断机制来保护系统的微服务,做出响应,避免资源被耗尽.既要能响应,又要能防护,当我们的请求达到一个负载阈值,就启用熔断,把真实接口关掉,给客户端请求一个响应,这个响应,我们可以设置.服务熔断就是对该服务的调用执行熔断,对应后续请求,不在继续调用该目标服务,而是直接返回,从而可以快速释放资源,或者服务出现故障,会把故障信息返回给客户端 服务熔断的几种方式: 断路器,这是一个硬件设施,比如保险丝或者电子设备等等 断路器

SpringCloud学习之Ribbon

一.负载均衡与Ribbon 负载均衡,在集群中是很常见的一个“名词”,顾名思义是根据一定的算法将请求分摊至对应的服务节点上,常见的算法有如下几种: 轮询法:所有请求被依次分发到每台应用服务器上,每台服务器需要处理的请求数目都相同,适合所有服务器硬件都相同的场景 随机法:请求被随机分配到各个应用服务器,在许多场合下,这种方案都很简单实用. 源地址哈希(Hash)法:将请求来源的IP地址进行Hash计算,得到对应的服务器,这样来自同一个IP的请求总在同一个服务器上处理 加权法:根据应用服务器配置的情

一文详解Spring Cloud Feign重试机制

前言 Feign组件默认使用Ribbon的重试机制并增加了根据状态码判断重试机制,默认情况下是不启用的.Feign使用的是Spring Retry组件,需要引入依赖才能启用. 一.POM引入Spring Retry <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> 二

SpringCloud Feign 之 超时重试次数探究

SpringCloud Feign 之 超时重试次数探究 上篇文章,我们对Feign的fallback有一个初步的体验,在这里我们回顾一下,Fallback主要是用来解决依赖的服务不可用或者调用服务失败或超时,使用默认的返回值.实际应用中, 在Fallback之前,需要对服务配置重试机制,当多次重试服务,还是服务不可用的情况下,就触发Fallback. 这里,我们对重试机制配置以及重试次数进行一次探究. Feign的超时 Feign接口调用分两层,Ribbon(负载均衡)和Hystrix(熔断器

SpringCloud+Eureka+Feign+Ribbon的简化搭建流程和CRUD练习

作者:个人微信公众号:程序猿的月光宝盒 环境:win10--idea2019--jdk8 1.搭建Eureka服务模块 1.1 新建eureka服务模块(Sping Initializr) 取名为eureka-server,并添加如下Dependencies: 1.2 配置application.properties #配置端口 server.port=8761 #spring 的应用名=一般是模块名 spring.application.name=eureka-server #当前模块是否注册

SpringCloud+Eureka+Feign+Ribbon的简化搭建流程,加入熔断,网关和Redis缓存[2]

作者:故事我忘了¢个人微信公众号:程序猿的月光宝盒 前提:本篇是基于 SpringCloud+Eureka+Feign+Ribbon的简化搭建流程和CRUD练习[1] 的修改与拓展 1.修改consumer的CenterFeign.java,把返回值全部设置为String /** * 是consumer调用provider(需要指定provider的名字) * 请求的清单列表:规定调用地址.参数.返回值 * 在正常走通的时候不走CenterFeignFallBack,当provider down

Volley超时重试机制详解

Volley超时重试机制 基础用法 Volley为开发者提供了可配置的超时重试机制,我们在使用时只需要为我们的Request设置自定义的RetryPolicy即可. 参考设置代码如下: int DEFAULT_TIMEOUT_MS = 10000; int DEFAULT_MAX_RETRIES = 3; StringRequest stringRequest = new StringRequest(Request.Method.GET, url, new Response.Listener<S

Rocket重试机制,消息模式,刷盘方式

一.Consumer 批量消费(推模式) 可以通过 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 这里需要分为2种情况 Consumer端先启动 Consumer端后启动.   正常情况下:应该是Consumer需要先启动 注意:如果broker采用推模式的话,consumer先启动,会一条一条消息的消费,consumer后启动会才用批量消费 Consumer端先启动 1.Consumer.java package quickstart

四钟预处理机制区别

一.printf() printf()函数优点在于可以格式化输出!printf()是一个函数,输出字符串,而print是一个语言结构,总是返回true(1),而echo也是语言结构,返回无效. $total = 12.4如果用echo时,是这样:echo "Total amount is $total.";输出:Total amount is 12.4.换回printf则为这样表达:printf("Total amount is %s.",$total);输出:To