Dubbo点滴之集群容错

首先看来自官方文档

  • 这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
  • Directory代表多个Invoker,可以把它看成List<Invoker>,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
  • Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
  • Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。
  • LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。

以前自己看到上面的文字,理解不太深。随着对分布式系统的深入接触,以及DUBBO源码的研究,才有了更精确的理解。总结一句话:阅读技术类文档时,对一些语言的理解,受到阅读者自身情况的限制,往往理解有深有浅。闲话少说。本文,就是针对图中的组件,一个个进行剖析。

1.Invoker

Invoker 是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。

1.1 API

public interface Invoker<T> extends Node {
    /**
     * get service interface.
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;
}

1.2  层次树结构

分析可知:主要有2个分支

1.AbstractInvoker:主要具体远程实现,和RPC 协议有关,属于dubbo-rpc-api范畴。

2.AbstractClusterInvoker:主要逻辑包括Invoker的选择,和高可用有关,属于dubbo-cluster范畴。

1.3 AbstractInvoker VS AbstractClusterInvoker

1.3.1 类图比较

通过下图可以很容易发现:AbstractClusterInvoker比较AbstractInvoker,多了些select,doselect,reselect方法。这些方法的作用也可以猜到。

1.3.2 以两个实现类对比

选择AbstractInvoker的子类DubboInvoker,

和AbstractClusterInvoker的子类FailoverClusterInvoker为代表。

其中FailoverClusterInvoker是dubbo集群容错默认方案,Dubbo协议为默认协议。

先看下AbstractInvoker分支

public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {

    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    Map<String, String> context = RpcContext.getContext().getAttachments();
    
    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
    } catch (RpcException e) {
    } catch (Throwable e) {    
    }
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
..
}

public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    logger.debug("doInvoke start -----------------------------");
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
           RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }finally {
        logger.debug("doInvoke end-----------------------------");
    }

}
...
}

这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步

对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果

再看下AbstractClusterInvoker分支

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {

    checkWheatherDestoried();

    LoadBalance loadbalance;
    
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
   List<Invoker<T>> invokers = directory.list(invocation);
   return invokers;
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                   LoadBalance loadbalance) throws RpcException;
..
}

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
   List<Invoker<T>> copyinvokers = invokers;
   checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
       //重试时,进行重新选择,避免重试时invoker列表已发生变化.
       //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
       if (i > 0) {
          checkWheatherDestoried();
          copyinvokers = list(invocation);
          //重新检查一下
          checkInvokers(copyinvokers, invocation);
       }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
 }
...
}

总结下AbstractClusterInvoker和AbstractClusterInvoker都需要实现Invoker接口,两者都声明了doInvoke抽象方法。但方法定义有区别。

protected abstract Result doInvoke(Invocation invocation) throws Throwable;

protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                   LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker 需要一个Invoker列表,他们来自Directory,LoadBalance 可以可以理解为负载均衡策略。

两者的联系:AbstractClusterInvoker 比AbstractInvoker多了一些选择和负载均衡部分,到最后还是会调用AbstractInvoker分支,负责具体RPC调用工作。

1.4 常见容错方案对比

Feature 优点 缺点 实现类
Failover Cluster 失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用) 重试会带来更长延迟 FailoverClusterInvoker
Failfast Cluster 快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作 如果有机器正在重启,可能会出现调用失败 FailfastClusterInvoker
Failsafe Cluster 失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作 调用信息丢失

FailsafeClusterInvoker
Failback Cluster 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作 不可靠,重启丢失
FailbackClusterInvoker

Forking Cluster

并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作 需要浪费更多服务资源 ForkingClusterInvoker

Broadcast

Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态 速度慢,任意一台报错则报错 BroadcastClusterInvoker

2. 总结

*ClusterInvoker 分别使用Router,和Directory获取Invoker列表。

*ClusterInvoker然后再Invoker列表中,借助LoadBalance提供的负载均衡策略,返回一个可用的Invoker.

*Directory代表多个Invoker,可以理解为Invoker的逻辑集合,负责Invoker的下线,上线。

Router和Directory两者都对我提供Invoker列表。通过提供的API,很容易找出区别来。

//Directory
List<Invoker<T>> list(Invocation invocation) throws RpcException;
//Route
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

Directory:返回的Invoker列表,代表当前正常对外提供服务的Invoker列表。重点在维护可用节点列表。

Route:返回的Invoker列表,是在现有可用的Invoker列表里,按照规则进行再筛选,重点在规则匹配,过滤等。

时间: 2024-10-12 07:40:38

Dubbo点滴之集群容错的相关文章

Dubbo之旅--集群容错和负载均衡

当我们的系统中用到Dubbo的集群环境,因为各种原因在集群调用失败时,Dubbo提供了多种容错方案,缺省为failover重试. Dubbo的集群容错在这里想说说他是因为我们实际的项目中出现了此类的问题,因为依赖的第三方项目出现异常,导致dubbo调用超时,此时使用的是默认的集群容错方式,而配置的reties='3',这样前段系统连续掉用了三次服务,结果可想而知. 先说一下各节点关系: 这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地

14. Dubbo原理解析-集群&amp;容错之Cluster

Dubbo作为一个分布式的服务治理框架,提供了集群部署,路由,软负载均衡及容错机制 下图描述了dubbo调用过程中的对于集群,负载等的调用关系. Cluster 将Directory中的多个Invoker伪装成一个Invoker, 对上层透明,包含集群的容错机制 Cluster接口定义 @SPI(FailoverCluster.NAME) public interface Cluster { @Adaptive <T> Invoker<T>join(Directory<T&g

15. Dubbo原理解析-集群&amp;容错之目录服务Directory

集群目录服务Directory, 代表多个Invoker, 可以看成List<Invoker>,它的值可能是动态变化的比如注册中心推送变更.集群选择调用服务时通过目录服务找到所有服务 Directory的接口定义 public interfaceDirectory<T> extends Node {     //服务类型 Class<T>getInterface(); //列出所有服务的可执行对象 List<Invoker<T>>list(Inv

16. Dubbo原理解析-集群&amp;容错之router路由服务

Router服务路由, 根据路由规则从多个Invoker中选出一个子集AbstractDirectory是所有目录服务实现的上层抽象, 它在list列举出所有invokers后,会在通过Router服务进行路由过滤. Router接口定义 public interface Router extendsComparable<Router> { URL getUrl(); <T> List<Invoker<T>> route(List<Invoker<

17. Dubbo原理解析-集群&amp;容错之负载均衡

LoadBalance负载均衡, 负责从多个 Invokers中选出具体的一个Invoker用于本次调用,调用过程中包含了负载均衡的算法,调用失败后需要重新选择 LoadBalance接口定义 @SPI(RandomLoadBalance.NAME) public interface LoadBalance{ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> i

Dubbo服务集群容错

Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的Forking Cluster模式配置,就可以对一个调用请求并行发送到多台对等的提供方

Dubbo 系列(07-3)集群容错 - 负载均衡

目录 Spring Cloud Alibaba 系列目录 - Dubbo 篇 1. 背景介绍 1.1 负载均衡算法 1.2 继承体系 2. 源码分析 2.1 AbstractLoadBalance 2.2 RandomLoadBalance 2.3 LeastActiveLoadBalance 2.4 ConsistentHashLoadBalance Dubbo 系列(07-3)集群容错 - 负载均衡 Spring Cloud Alibaba 系列目录 - Dubbo 篇 1. 背景介绍 相关

Dubbo分布式服务框架(一)——集群容错详解

1.集群调用失败时,Dubbo提供了多种容错方案. 各节点关系: Invoker是Provider某个可调用Service的抽象,封装了Provider地址及Service接口信息. Directory可以看成List<Invoker>,与List不同的是,它的值可能是动态变化的,比如注册中心推送变更. Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个. Router负责从多个Invoker中按路由规则

dubbo之集群容错

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试. 集群容错模式 1. Failover Cluster 失败自动切换,当出现失败,重试其它服务器 .通常用于读操作,但重试会带来更长延迟.可通过 retries="2" 来设置重试次数(不含第一次).重试次数配置如下:<dubbo:service retries="2" /><dubbo:reference retries="2" /><