motan源码分析五:cluster相关

上一章我们分析了客户端调用服务端相关的源码,但是到了cluster里面的部分我们就没有分析了,本章将深入分析cluster和它的相关支持类。

1.clustersupport的创建过程,上一章的ReferConfig的initRef()方法中调用了相关的创建代码:

        for(Iterator iterator = protocols.iterator(); iterator.hasNext();)
        {
            ProtocolConfig protocol = (ProtocolConfig)iterator.next();
            LoggerUtil.info((new StringBuilder("ProtocolConfig‘s")).append(protocol.getName()).toString());
            Map params = new HashMap();
            params.put(URLParamType.nodeType.getName(), "referer");
            params.put(URLParamType.version.getName(), URLParamType.version.getValue());
            params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
            collectConfigParams(params, new AbstractConfig[] {
                protocol, basicReferer, extConfig, this
            });
            collectMethodConfigParams(params, getMethods());
            URL refUrl = new URL(protocol.getName(), localIp, 0, interfaceClass.getName(), params);
            ClusterSupport clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);//创建clustersupport
            clusterSupports.add(clusterSupport);
            clusters.add(clusterSupport.getCluster());//获取对应的cluster
            proxy = proxy != null ? proxy : refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue());
        }

    private ClusterSupport createClusterSupport(URL refUrl, ConfigHandler configHandler, List registryUrls)
    {
        List regUrls = new ArrayList();
        if(StringUtils.isNotBlank(directUrl) || "injvm".equals(refUrl.getProtocol()))
        {
            URL regUrl = new URL("local", "127.0.0.1", 0, com/weibo/api/motan/registry/RegistryService.getName());
            if(StringUtils.isNotBlank(directUrl))
            {
                StringBuilder duBuf = new StringBuilder(128);
                String dus[] = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrl);
                String as[];
                int j = (as = dus).length;
                for(int i = 0; i < j; i++)
                {
                    String du = as[i];
                    if(du.contains(":"))
                    {
                        String hostPort[] = du.split(":");
                        URL durl = refUrl.createCopy();
                        durl.setHost(hostPort[0].trim());
                        durl.setPort(Integer.parseInt(hostPort[1].trim()));
                        durl.addParameter(URLParamType.nodeType.getName(), "service");
                        duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(",");
                    }
                }

                if(duBuf.length() > 0)
                {
                    duBuf.deleteCharAt(duBuf.length() - 1);
                    regUrl.addParameter(URLParamType.directUrl.getName(), duBuf.toString());
                }
            }
            regUrls.add(regUrl);
        } else//走注册中心的方式
        {
            if(registryUrls == null || registryUrls.isEmpty())
                throw new IllegalStateException(String.format("No registry to reference %s on the consumer %s , please config <motan:registry address=\"...\" /> in your spring config.", new Object[] {
                    interfaceClass, "127.0.0.1"
                }));
            URL url;
            for(Iterator iterator = registryUrls.iterator(); iterator.hasNext(); regUrls.add(url.createCopy()))
                url = (URL)iterator.next();

        }
        URL url;
        for(Iterator iterator1 = regUrls.iterator(); iterator1.hasNext(); url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr())))
            url = (URL)iterator1.next();

        return configHandler.buildClusterSupport(interfaceClass, regUrls);//调用simpleconfighandler的创建clustersupport方法
    }

    public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
        ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);//创建cluster支持类,将业务接口和注册中心信息传递进去
        clusterSupport.init();//初始化

        return clusterSupport;
    }

2.clustersupport的init和prepare方法

    public void init() {

        prepareCluster();

        URL subUrl = toSubscribeUrl(url);
        for (URL ru : registryUrls) {//循环注册中心的url

            String directUrlStr = ru.getParameter(URLParamType.directUrl.getName());
            // 如果有directUrl,直接使用这些directUrls进行初始化,不用到注册中心discover
            if (StringUtils.isNotBlank(directUrlStr)) {
                List<URL> directUrls = parseDirectUrls(directUrlStr);
                if (!directUrls.isEmpty()) {
                    notify(ru, directUrls);
                    LoggerUtil.info("Use direct urls, refUrl={}, directUrls={}", url, directUrls);
                    continue;
                }
            }

            // client 注册自己,同时订阅service列表
            Registry registry = getRegistry(ru);//获取zookeeper的注册中心
            registry.subscribe(subUrl, this);//注册自己并订阅服务
        }

        boolean check = Boolean.parseBoolean(url.getParameter(URLParamType.check.getName(), URLParamType.check.getValue()));
        if (!CollectionUtil.isEmpty(cluster.getReferers()) || !check) {
            cluster.init();//初始化集群
            if (CollectionUtil.isEmpty(cluster.getReferers()) && !check) {
                LoggerUtil.warn(String.format("refer:%s", this.url.getPath() + "/" + this.url.getVersion()), "No services");
            }
            return;
        }

        throw new MotanFrameworkException(String.format("ClusterSupport No service urls for the refer:%s, registries:%s",
                this.url.getIdentity(), registryUrls), MotanErrorMsgConstant.SERVICE_UNFOUND);
    }

    private void prepareCluster() {
        String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue());//集群名称
        String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue());//负载均衡名称
        String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue());//ha高可用名称

        cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(clusterName);//获取具体的集群对象
        LoadBalance<T> loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);//获取具体的负载均衡方式,目前motan支持6种负载方式
        HaStrategy<T> ha = ExtensionLoader.getExtensionLoader(HaStrategy.class).getExtension(haStrategyName);//获取高可用的方式,目前支持两种failfast和failover方式
        cluster.setLoadBalance(loadBalance);
        cluster.setHaStrategy(ha);
        cluster.setUrl(url);
    }

3.负载均衡,motan支持6种方式,分别是:轮训、随机、hash、本地服务优先、权重可配置、低并发优先,具体代码可见com.weibo.api.motan.cluster.loadbalance目录,本文我们主要看一下轮训的方式:

public class RoundRobinLoadBalance<T> extends AbstractLoadBalance<T> {

    private AtomicInteger idx = new AtomicInteger(0);

    @Override
    protected Referer<T> doSelect(Request request) {
        List<Referer<T>> referers = getReferers();//获取所有服务器的引用

        int index = idx.incrementAndGet();//自增
        for (int i = 0; i < referers.size(); i++) {
            Referer<T> ref = referers.get((i + index) % referers.size());//利用自增数去模,达到轮训的目的
            if (ref.isAvailable()) {
                return ref;
            }
        }
        return null;
    }

    @Override
    protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
        List<Referer<T>> referers = getReferers();

        int index = idx.incrementAndGet();
        for (int i = 0; i < referers.size(); i++) {
            Referer<T> referer = referers.get((i + index) % referers.size());
            if (referer.isAvailable()) {
                refersHolder.add(referer);
            }
        }
    }
}

4.motan支持failfast和failover两种方式,failfast只调用一次,如果失败则直接返回失败,failover循环调用若干次,直到成功或循环结束后

    public Response call(Request request, LoadBalance<T> loadBalance) {

        List<Referer<T>> referers = selectReferers(request, loadBalance);//获取所有的引用
        if (referers.isEmpty()) {
            throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request,
                    loadBalance));
        }
        URL refUrl = referers.get(0).getUrl();
        // 先使用method的配置
        int tryCount =
                refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                        URLParamType.retries.getIntValue());//获取重试次数
        // 如果有问题,则设置为不重试
        if (tryCount < 0) {
            tryCount = 0;
        }

        for (int i = 0; i <= tryCount; i++) {
            Referer<T> refer = referers.get(i % referers.size());//循环调用
            try {
                request.setRetries(i);
                return refer.call(request);
            } catch (RuntimeException e) {
                // 对于业务异常,直接抛出
                if (ExceptionUtil.isBizException(e)) {
                    throw e;//业务异常退出调用
                } else if (i >= tryCount) {
                    throw e;
                }
                LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
            }
        }

        throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
    }

本章知识点总结:

1.一个cluster有一个cluster的支持类,有一个ha,有一个loadbalance;

2.motan支持6种负载均衡方式;

3.motan支持failover的ha方式;

时间: 2024-10-08 20:04:32

motan源码分析五:cluster相关的相关文章

baksmali和smali源码分析(五)

官方文档对于dex中的class数据结构表示如下: class_idx uint index into the type_ids list for this class. This must be a class type, and not an array or primitive type. access_flags uint access flags for the class (public, final, etc.). See "access_flags Definitions&quo

Nouveau源码分析(五):NVIDIA设备初始化之nouveau_drm_load (2)

Nouveau源码分析(五) 接着上一篇来,先把nouveau_drm_load再贴出一遍来吧: // /drivers/gpu/drm/nouveau/nouveau_drm.c 364 static int 365 nouveau_drm_load(struct drm_device *dev, unsigned long flags) 366 { 367 struct pci_dev *pdev = dev->pdev; 368 struct nouveau_drm *drm; 369 i

[Android] Volley源码分析(五)答疑

Volley源码分析系列出了有一段日子了,有不少看官私底下给我留言,同时抛出了一些问题.对于一些比较简单的问题我们跳过去,这两天接到网友是@smali提出的问题.不得不赞一下这位看官看源码时候的细腻程度,我引出这个问题供大家一块思考一下. Q:在写入文件头数据的时候为何不直接写入Int而是通过移位的方式来完成? 我们来看一下对应的源码: writeInt(os, CACHE_MAGIC); static void writeInt(OutputStream os, int n) throws I

Vue系列---理解Vue.nextTick使用及源码分析(五)

_ 阅读目录 一. 什么是Vue.nextTick()? 二. Vue.nextTick()方法的应用场景有哪些? 2.1 更改数据后,进行节点DOM操作. 2.2 在created生命周期中进行DOM操作. 三. Vue.nextTick的调用方式如下: 四:vm.$nextTick 与 setTimeout 的区别是什么? 五:理解 MutationObserver 六:nextTick源码分析 回到顶部 一. 什么是Vue.nextTick()? 官方文档解释为:在下次DOM更新循环结束之

MPTCP 源码分析(五) 接收端窗口值

简述: 在TCP协议中影响数据发送的三个因素分别为:发送端窗口值.接收端窗口值和拥塞窗口值. 本文主要分析MPTCP中各个子路径对接收端窗口值rcv_wnd的处理. 接收端窗口值的初始化 根据<MPTCP 源码分析(二) 建立子路径>中描述服务端在发送完SYN/ACK并接收到ACK的时候建立新的sock. 在内核实现中,针对连接请求分为两个步骤处理: SYN队列处理:当服务端收到SYN的时候,此连接请求request_sock将被存放于listening socket的SYN队列,服务端发送S

Vue 2.0 深入源码分析(五) 基础篇 methods属性详解

用法 methods中定义了Vue实例的方法,官网是这样介绍的: 例如:: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <script src="https://cdn.bootcss.com/vue/2.5.16/vue.js"></script> <title>Document&

motan源码分析十一:部分特性

本章将描述motan部分的特性并对源码进行分析. 1.requestid的维护,使用了当前时间左移20位,再和一个自增变量组合 public class RequestIdGenerator { protected static final AtomicLong offset = new AtomicLong(0); protected static final int BITS = 20; protected static final long MAX_COUNT_PER_MILLIS = 1

飞鸽传书源码分析五-文件传输

转载请注明出处:http://blog.csdn.net/mxway/article/details/44889871 本文是在飞鸽传书源码v2.06的基础上进行分析的. 1.添加要发送的文件 文件的发送是在发送对话框中进行的,首先找到发送对话框的快捷菜单. File Transfer对应的菜单id为MENU_FILEADD,相应的command处理事件在Senddlg.cpp中的EvCommand函数中 BOOL TSendDlg::EvCommand(WORD wNotifyCode, WO

motan源码分析四:客户端调用服务

在第一章中,我们分析了服务的发布与注册,本章中将简单的分析一下客户端调用服务的代码及流程,本文将以spring加载的方式进行分析. 1.在DemoRpcClient类的main()方法中加载类: ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"classpath:motan_demo_client.xml"}); MotanDemoService service = (MotanDemo