服务引用

执行步骤

ReferenceBean.getObject()
    -->ReferenceConfig.get()
    -->init()
        -->createProxy(map)
          -->refprotocol.refer(interfaceClass, urls.get(0))
            -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
            -->extension.refer(arg0, arg1);
              -->ProtocolFilterWrapper.refer (三个AOP类)
                -->RegistryProtocol.refer
                  -->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
                  -->doRefer(cluster, registry, type, url)
                    -->FailbackRegistry.register
                    //创建zk的节点,和服务端发布一样(省略代码)。
                    //节点名为:dubbo/per.qiao.service.TestService/consumers
                    -->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)
                    //dubbo/per.qiao.service.TestService/providers,
                    //dubbo/per.qiao.service.TestService/configurators
                    //dubbo/per.qiao.service.TestService/routers
                      -->notify(url, listener, urls);
                        -->FailbackRegistry.notify
                           -->doNotify(url, listener, urls);
                            -->AbstractRegistry.notify
                               -->saveProperties(url);
                                //把注册信息保存到cache文件(路径规则与暴露时一样)
                                 -->registryCacheExecutor.execute(new SaveProperties(...));
                                //采用线程池来处理
                               -->listener.notify(categoryList);
                                 -->RegistryDirectory.notify
                                   -->refreshInvoker(invokerUrls);
                                     //将URL转换成Invoker key为URL的字符串形式
                                     -->toInvokers(invokerUrls)
                                        -->protocol.refer(serviceType, url), url, providerUrl);
                                          -->Protocol$Adaptive.refer
                                             -->ExtensionLoader.getExtensionLoader(class)
                                                .getExtension("dubbo");
                                             -->extension.refer(type, url);
                                               -->QosProtocolWrapper.refer
                                                    //这里创建了一个过滤连
                                                    //buildInvokerChain(invoker,
                                                    //"refernce.filter","consumer")
                                                  -->ProtocolFilterWrapper.refer
                                                    //return new ListenerInvokerWrapper
                                                    -->ProtocolListenerWrapper.refer
                                                      //return new DubboInvoker
                                                      -->DubboProtocol.refer
                                     -->destroyUnusedInvokers(
                                        oldUrlInvokerMap,newUrlInvokerMap);
                                // 关闭未使用的Invoker
                                //最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
                                ,刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
                    -->cluster.join(directory)//加入集群路由
                       -->ExtensionLoader.getExtensionLoader(Cluster.class)
                                .getExtension("failover");
                         -->MockClusterWrapper.join
                           -->this.cluster.join(directory)
                             -->FailoverCluster.join
                                -->return new FailoverClusterInvoker<T>(directory)
                                -->new MockClusterInvoker  //  返回的invoker对象
--------------------------------------------------------------------------------------------
          -->proxyFactory.getProxy(invoker)  //创建服务代理
            -->ProxyFactory$Adpative.getProxy
              -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
                            .getExtension("javassist");
                -->StubProxyFactoryWrapper.getProxy(invoker)  //进行了后置增强
                  -->AbstractProxyFactory.getProxy
                    -->getProxy(invoker, interfaces)
                      -->Proxy.getProxy(interfaces)
                        -->JavassistProxyFactory.getProxy
                            -->Proxy.getProxy(interfaces)
                            //目前代理对象per.qiao.service.TestSevice
                            //, interface com.alibaba.dubbo.rpc.service.EchoService
                            -->newInstance(InvokerInvocationHandler(MockClusterInvoker))
                            //这个MockClusterInvoker是上面refprotocol.refer返回的invoker对象
                            //采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。

生成的代理类

入口:

ReferenceConfig#init,
ref = createProxy(map);

JavassistProxyFactory#getProxy会生成一个代理类

与其说生成一个代理类,倒不如说是两个(具体在com.alibaba.dubbo.common.bytecode.Proxy#getProxy中)

一个clazz(ccp),一个pc(ccm)

return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

Proxy.getProxy方法会生成两个类,并返回Proxy0,调用newInstance时,

会调用Proxy0#newInstance(handler), 最终返回proxy0对象

也就是说,ReferenceBean.getObject(调用者)就是这个proxy0对象

public class Proxy0 extends Proxy {
    @Override
    public Object newInstance() {
        return super.newInstance();
    }
    @Override
    public Object newInstance(java.lang.reflect.InvocationHandler h) {
        return new proxy0(h);
    }
}

具体操作类(注意:这两个类只有第一个字的大小写不同)

import java.lang.reflect.InvocationHandler;

/**
 * Create by IntelliJ Idea 2018.2
 *
 * @author: qyp
 * Date: 2019-05-27 10:46
 */
public class proxy0 implements com.alibaba.dubbo.rpc.service.EchoService, per.qiao.service.TestService {

    /**
     * 包含这两个接口的实现方法,这里为($echo,getData,getList)
     */
    public static java.lang.reflect.Method[] methods;
    /**
     * 这个hanlder就是上面执行过程refprotocol.refer返回的结果(MockClusterInvoker)
     */
    private java.lang.reflect.InvocationHandler handler;

    public proxy0(InvocationHandler h) {
        this.handler = h;
    }
    // ---------这个方法是EchoService中的-------------
    @Override
    public Object $echo(java.lang.Object arg0)  {
        Object[] args = new Object[1];
        args[0] = arg0;
        Object ret = null;
        try {
            ret = handler.invoke(this, methods[2], args);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return (java.lang.Object) ret;
    }
    // ----------下面两个方法是服务引用的接口中的方法-----------
    @Override
    public java.lang.String getData(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = arg0;
        Object ret = null;
        try {
            ret = handler.invoke(this, methods[0], args);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return (java.lang.String) ret;
    }

    @Override
    public java.util.List getList() {
        Object[] args = new Object[0];
        Object ret = null;
        try {
            ret = handler.invoke(this, methods[1], args);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return (java.util.List) ret;
    }
}

详细说以下服务引用时,是怎么和zookeeper产生联系的;

问题:如果服务端(生产端)已经启动,客户端(消费段)后,zookeeper上的节点已经存在,那么久不会通知到客户端,那么zookeeper是怎么刷新本地服务列表的??

RegistryProtocol

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    //将url转为Registry对象
    Registry registry = registryFactory.getRegistry(url);
    ...
    // type是接口
    return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //创建一个注册目录
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            //注册消费者节点
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        //订阅 providers,configurators,routers这三个节点
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        // 将订阅信息保存到本地注册表
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

cluster不是我们要分析的重点,

FailbackRegistry

public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 重试注册失败的URL 默认5秒之后重试,间隔是5秒
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    retry();
                } catch (Throwable t) {
                    ...
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

public void register(URL url) {
    super.register(url);
    //删除注册失败的URL
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服务器端发送注册请求
        doRegister(url);
    } catch (Exception e) {
        // 如果打开启动检测,则直接抛出异常 (配置的check属性)
        ...
        }
        // 将失败的注册请求记录到失败的列表中,定期重试
        failedRegistered.add(url);
    }
}

ZookeeperRegistry

用来创建消费者节点

protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        ...
    }
}

再来分析消费者订阅

RegistryProtocol.doRefer

//订阅 providers,configurators,routers这三个节点
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

RegistryDirectory.subscribe

public void subscribe(URL url) {
    //设置当前订阅URL
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

FailbackRegistry

public void subscribe(URL url, NotifyListener listener) {
    //设置订阅的回调监听器
    super.subscribe(url, listener);
    //删除失败的订阅路径
    removeFailedSubscribed(url, listener);
    try {
        // 注册客户端信息到zookeeper并创建监听三个节点,顺便刷新本地注册表
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        // 如果订阅失败,则从本地缓存文件中获取监听的URL刷新注册表
        // 需要了解到的是, 缓存中的数据是通过消费段注册,或者zookeeper通知时调用notify才有的
        // 也只有订阅失败了才会有此操作
        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            notify(url, listener, urls);
        } else {
            ...
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);
    }
}

ZookeeperRegistry

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    //链接到zookeeper
    zkClient = zookeeperTransporter.connect(url);
    //设置状态监听器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            ...
            } else {
                List<URL> urls = new ArrayList<URL>();
                // 遍历需要监听的URL (三个)
                for (String path : toCategoriesPath(url)) {
                  //从缓存中获取监听
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    //如果缓存中没有,创建监听
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                              //监听器回调方法为ZookeeperRegistry#notify
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //创建三个监听的节点
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 在注册zookeeper的节点监听器后,自动去刷新本地列表
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            ...
        }
    }

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            ...
        }
    }
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }

AbstractRegistry

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    // 遍历监听的URL 3个 添加到result
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //将监听过的URL保存到本地文件
        saveProperties(url);
        //刷新本地注册表
        listener.notify(categoryList);
    }
}

RegistryDirectory

public synchronized void notify(List<URL> urls) {
    // 分别对应 provider, router 和 configurator节点
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // merge override parameters
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}
// 刷新本地注册表
private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            //转换URL为Invoker对象 只有provider节点的url才能生成Invoker对象
            // 这里返回的是一个invoker的过滤连结构,终点是DubboInvoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
            //转换成方法名对应Invoker
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                // 销毁无用的Invoker对象
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

客户端在服务的时候会将消费端信息注册到zookeeper(也可以试别的)节点上,顺便监听了providers,configurators,routers这三个节点,然后调用了RegistryDirectory.notiry刷新本地注册表, 返回的结果(引用对象)为MockClusterInvoker包含了RegistryDirectory对象

小结:

1. 注册到zookeeper,并订阅providers,configurators和routers节点
2. 通过refprotocol.refer获取的invoker对象是MockClusterInvoker(默认包装了FailoverClusterInvoker)
3. ReferenceBean#getObject获取的对象是上面的proxy0对象, 依赖了(2)中的MockClusterInvoker

原文地址:https://www.cnblogs.com/qiaozhuangshi/p/11007043.html

时间: 2024-10-26 03:05:09

服务引用的相关文章

9. Dubbo原理解析-代理之服务引用

服务引用是服务的消费方向注册中心订阅服务提供方提供的服务地址后向服务提供方引用服务的过程. 服务的应用方在spring的配置实例如下: <dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo. DemoService"/> 如上配置spring在容器启动的时候会解析自定义的schema元素<dubbo: reference/>转换成dubbo内部数据结构Refer

C# 动态修改 Web 服务引用

我们添加webService引用,一般是通过 添加服务引用完成的,其实 添加服务引用 在背后为我们生成了代理类. 我们手动生成代理类方法: 1. 编译成cs文件:TestService.cs     在vs2008命令提示窗口中输入: wsdl /out:d:\TestService.cs http://webservice.webxml.com.cn/WebServices/MobileCodeWS.asmx?wsdl 2.将生成的TestService.cs拷到项目中,既可以使用WebSer

Xamarin.Form 初学 之 服务引用-WCF服务引用

最近研究一下Xamarin.Form,感觉这个东西确实不错,之前一直做WPF相关,然后看到Xamarin.Form开发与WPF特别相似的语法.很有兴趣! 可是环境部署对于小白的我,可是费了不少功夫!安装VS2015费了我好些时间!安装部署以后再说!先说说引用WCF服务的坑吧! 官方文档:Xamarin可以调用WCF,可以怎么调用???(满脑子问号)https://developer.xamarin.com/guides/xamarin-forms/web-services/consuming/w

添加web引用和添加服务引用有什么区别?

添加web引用和添加服务引用有什么区别,Add Service References 和 Add Web References 有啥区别?参考 http://social.microsoft.com/Forums/zh-CN/xmlwebserviceszhchs/thread/808d870b-49f1-47ac-b105-4beb580bcec6 (1)VS2005里提供的Add Web Reference(添加Web服务引用)的功能主要是添加Web Service引用.(2)VS2008保

VS2012 添加服务引用常见错误

问题:用vs2012 添加wcf引用时在对象查看器中找不到 服务引用的类 例如默认高级配置: 解决办法:在服务的高级配置中,将重新使用引用的程序集中的类型 选项勾去掉 点击确定  即可

(转)添加服务引用和添加Web引用对比

在WindowsForm程序中添加服务引用和Web引用对比 为了验证书上有关Visual Studio 2010添加服务引用和Web引用的区别,进行实验. 一.建立一个Web服务程序项目新建项目,选择ASP.NET空Web应用程序,在其中添加Web服务,然后发布到IIS,路径为http://localhost/hello/hello.asmx,服务主要通过HelloWorld()方法输出字符串“Hello World”,内容如下: using System;using System.Collec

dubbo refrence bean(服务引用)

在xml上写一个dubbo标签就可以把远程的服务引用到本地使用: <dubbo:service interface="com.test.dubbo.service.BuyFoodService" ref="buyFoodService"/> 既然用spring那就是Schema了,dubbo中自定义了Schema,在DubboNamespaceHandler中: registerBeanDefinitionParser("service&quo

调用WCF不需要添加服务引用,使用一个WCFHelper类就可以

效果图: 调用过程: string WCFURL = "http://localhost:100/Service1.svc"; UserRequest user = new UserRequest { UserName = UserName.Text, Address = Address.Text, Email = Email.Text }; string request = "{\"request\":" + new JavaScriptSer

VS添加服务引用和 Web引用的区别

参考: http://blog.csdn.net/szstephenzhou/article/details/7834669 http://book.2cto.com/201304/20441.html 1. 添加服务引用使用的是WCF服务,而添加Web引用使用的是Web服务. 2.同时存在添加服务引用与添加Web引用两者情况的项目类型是Web服务程序,包括Web Service项目.普通的控制台和窗体等类型是没有添加Web引用的. 下面是我自己的心得: 1.在VS2012中 窗体应用程序  只

WCF服务引用之后自动生成的泛型代理类名称太长的解决方案

问题:WCF服务引用之后会将原来的泛型类自动生成一个代理类,但是有时候名称太长怎么办? 解决方案: 1.方案一: 调用客户端同样也引用这个泛型类的类库. 2.方案二: 找到这个泛型类,然后在上面的[DataContract]中添加Name属性. 如下,{#}会被自动替换为Hash码.{0}和{1}会自动变成泛型值的名称. namespace DataContractSerializerDemos { DataContract(Name="BillOf{0}{1}{#}")] publi