【Dubbo源码阅读系列】服务暴露之远程暴露

引言

什么叫 远程暴露 ?试着想象着这么一种场景:假设我们新增了一台服务器 A,专门用于发送短信提示给指定用户。那么问题来了,我们的 Message 服务上线之后,应该如何告知调用方服务器,服务器 A 提供了 Message 功能?那么我们是不是可以把目前已提供的服务暴露在一个地方,让调用方知道某台机器提供了某个特定功能?带着这样的假设,我们今天就来聊聊 Dubbo 服务暴露之远程暴露!!

服务远程暴露

先回顾一下上篇文章,上篇文章我们聊到了 ServiceConfig 的 export() 方法,并且对服务的本地暴露内容进行了分析,今天我们接着这块内容讲讲服务暴露之远程暴露。

// export to remote if the config is not local (export to local only when config is local)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ...
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        if (registryURLs != null && !registryURLs.isEmpty()) {
            for (URL registryURL : registryURLs) {
                url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                // 为了帮助大家阅读,省略部分代码...
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
            ...
        }
    }
    ...
}

这里我们只关注核心代码:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);

invoker 对象的构建

先来看看 invoker 对象是怎么创建的!这里涉及到了 Dubbo SPI 机制,调用流程大致为
StubProxyFactoryWrapper.getInvoker() ==> JavassistProxyFactory.getInvoker()
详细看下 JavassistProxyFactory 类的 getInvoker 方法

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                Class<?>[] parameterTypes,
                Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

值得我们重点注意的是 Wrapper 类的 getWrapper() 方法!!

public static Wrapper getWrapper(Class<?> c) {
    while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
    {
        c = c.getSuperclass();
    }

    if (c == Object.class) {
        return OBJECT_WRAPPER;
    }

    Wrapper ret = WRAPPER_MAP.get(c);
    if (ret == null) {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c, ret);
    }

    return ret;
}

这里会使用参数 c 作为 key 值从 WRAPPER_MAP 缓存中取值,如果没有对应的 value 值,会调用 makeWrapper() 方法借助 javassist 技术构建一个 Wrapper 包装类。假设当前参数 c 的值为 demoService,那么最后生成的动态类为:

public class Wrapper0 extends Wrapper implements DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Wrapper0() {
    }

    public Class getPropertyType(String var1) {
        return (Class)pts.get(var1);
    }

    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        DemoService var5;
        try {
            var5 = (DemoService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }

        try {
            if("sayHello".equals(var2) && var3.length == 1) {
                return var5.sayHello((String)var4[0]);
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }

        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getPropertyNames() {
        return pns;
    }

    public Object getPropertyValue(Object var1, String var2) {
        try {
            DemoService var3 = (DemoService)var1;
        } catch (Throwable var5) {
            throw new IllegalArgumentException(var5);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public void setPropertyValue(Object var1, String var2, Object var3) {
        try {
            DemoService var4 = (DemoService)var1;
        } catch (Throwable var6) {
            throw new IllegalArgumentException(var6);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public boolean hasProperty(String var1) {
        return pts.containsKey(var1);
    }

    public String[] getMethodNames() {
        return mns;
    }
}

最后再回到 JavassistProxyFactory 类的 getInvoker 方法,可以看到它实际返回的是 AbstractProxyInvoker 对象,当调用 AbstractProxyInvoker 类的 doInvoke() 方法时,实际调用的是 wrapper 类的 invokeMethod() 方法!这个知识点十分重要!在我们讲 Dubbo 远程调用的时候会再次回顾这块内容!

exporter 对象的构建

Exporter<?> exporter = protocol.export(wrapperInvoker);

再来看看后半句代码。这里最后会调用 RegistryProtocol 类的 export() 方法,若对此有疑问请看系列文章第一篇:【Dubbo源码阅读系列】之 Dubbo SPI 机制,后文不再赘述。 直接看看 RegistryProtocol 的 export() 方法:

RegistryProtocol.export()

public class RegistryProtocol implements Protocol {
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
}

RegistryProtocol.export() 方法非常重要!!可以说是服务远程暴露的核心了。废话不多说,让我们逐行来看看吧!

doLocalExport()

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    // 获取 providerUrl ,取 originInvoker url.parameters 键值对中 key 为 export 的值
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

先来看看 doLocalExport() 方法做了什么:

  1. 从 getCacheKey() 方法中获取到的,键 export 对应的 value 在如下代码中被添加到 url 的 parameters 集合中。然后我们在这里取出对应的值。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  1. 尝试从 bounds 缓存中取对应当前键 key 的 exporter。
  2. 如果缓存为 null,新建 exporter 并返回。这里的 protocl 对象为 Protocol$Adaptive。不难分析最后执行的实际是 DubboProtocol 的 export() 方法。

总结一下:doLocalExport() 用 ExporterChangeableWrapper 代理类包装了 protocol.export() 方法返回的 exporter 对象,最后放到了 bounds 集合中缓存。

DubboPrtocol.export()

DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

接着上文继续看 DubboProtocol.export() 方法是如何创建 exporter 对象的:

  1. 调用 serviceKey() 方法构建服务的 key 值,最后的获得的 key 值形式类似 group/path:version:port
  2. 新建 DubboExporter
  3. openServer(url),此时的 url 为 RegistryProtocol 传递过来的 providerUrl,openServer() 用途我们在后文分析;
  4. optimizeSerialization(url) 序列化操作,本文不做具体分析
    DubboProtocol.export() 返回的对象为 DubboExporter。值得我们注意是后面的 openServer() 方法!

    openServer()

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

openServer() 光从方法名看起来像是开启服务连接的。方法比较简单,取 url 的 address 作为 key,尝试从 serverMap 获取对应的 value 值。如果 value 值为 null 则调用 createServer(url) 方法创建 server 后添加到 serverMap 中。
createServer() 方法的流程比较冗长,我们这里通过一张时序图来给出该方法内部调用流程:

上图省略了从 ServiceConfigRegistryProtocol 以及从 RegistryProtocolDubboProtocol 的转换过程。这部分内容涉及到 Dubbo SPI 机制,如有疑问可以详见:【Dubbo源码阅读系列】之 Dubbo SPI 机制。这里给出简单的转换流程

  • ServiceConfig 到 RegistryProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》RegistryProtocol
  • RegistryProtocol 到 DubboProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》DubboProtocol

最后重点关注下 NettyServer 的 doOpen() 方法:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

可以看到这段代码是比较经典的 Netty 服务端启动代码...也就是说 openServer() 方法用于 Netty 服务端启动。
我们知道 Netty 常用于客户端和服务端之间的通讯。在这里我们开启了服务端,那么在何处会开启对应的客户端呢?他们之间到底会进行什么交互呢?这个疑问我们先留着待后续文章讲解。

服务的暴露

上面讲了这么多,感觉还是和服务远程暴露没有沾多大的边?到底我们的服务是如何被其它机器感知的?别人是怎么知道我们某某台机器提供了短信服务的?其实揭秘的序幕已经拉开了!让我们继续娓娓道来!
回顾一下 RegistryProtocol.export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registeredProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    if (register) {
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

上面我们已经聊完了 doLocalExport() 方法,继续看 export() 方法的后半部分:

RegistryProtocol.java
final Registry registry = getRegistry(originInvoker);

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

这里的 registryFactory 为 RegistryFactory$Adaptive(Dubbo 源码中充斥了大量 SPI 扩展机制的使用,这里不再赘述)。总之我们获取到的扩展类为 ZookeeperRegistryFactory ,ZookeeperRegistryFactory 继承自 AbstractRegistryFactory 类。因此最后调用的是 AbstractRegistryFactory 类的 getRegistry() 方法。

@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

方法比较简单,直接看重点方法 createRegistry(url)。createRegistry() 是一个抽象方法,会根据 url 来调用具体的实现方法,这里我们用 ZookeeperRegistryFactory 类进行分析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    ...
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    ...
}

public class ZookeeperRegistry extends FailbackRegistry {
    ...
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
    ...
}

ZookeeperRegistryFactory 类的 createRegistry() 方法会调用 ZookeeperRegistry 类的构造方法新建 ZookeeperRegistry 实例并返回。而 ZookeeperRegistry 类的构造方法会先调用父类 FailbackRegistry 的构造方法再执行后续操作。先看 FailbackRegistry 构造方法:

public abstract class FailbackRegistry extends AbstractRegistry {
    ...
    public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    // 延迟重试
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
    ...
}

在 FailbackRegistry 构造方法中有一个延迟重试方法 retry(),如果发现失败集合 failedRegistered、failedUnregistered、failedSubscribed、failedUnsubscribed、failedNotified 不为空,会进行重试操作。
继续看 ZookeeperRegistry 类的构造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    ...
    zkClient = zookeeperTransporter.connect(url);
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

这里的 ZookeeperTransporter.connect() 经过 SPI 转换实际调用为 CuratorZookeeperTransporter.connect()。

public class CuratorZookeeperTransporter implements ZookeeperTransporter {
    @Override
    public ZookeeperClient connect(URL url) {
        return new CuratorZookeeperClient(url);
    }
}

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
    private final CuratorFramework client;

    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(timeout);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

上面这段代码使用 CuratorFrameworkFactory 工厂类创建了一个 CuratorFramework 实例,并启动该实例创建了一个与 zookeeper 的连接。

再回到 RegistryProtocol 中的 getRegistry() 方法。我们发现它通过层层调用最终创建了一个到 ZookeeperRegistry 实例。这个实例中的 ziClient 对象建立了到 zookeeper 的连接。
我们知道 ZooKeeper 经常被用作注册中心Ok。那我们现在已经连接上了 ZooKeeper 了,是不是该往 Zookeeper 上写点啥了?继续往下看,好戏要来啦!!~

register() 注册方法

RegistryProtocol.java
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        ...
        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }
        ...
    }
    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

在这里 register() 方法最终会调用 FailbackRegistry 类的 register() 方法(不想再赘述为什么!!!!)。

public abstract class FailbackRegistry extends AbstractRegistry {
    ...
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            // ...
        }
    }
    ...
}

public class ZookeeperRegistry extends FailbackRegistry {
    protected void doRegister(URL url) {
        try {
            String str = toUrlPath(url);
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

划重点啊筒子们!!! doRegister() 方法!!这里调用链也比较长。画个简图总结下:


小结:总之最后的目的是在 ZooKeeper 上创建通过 url 解析生成的 path 节点。大概长这个样子:dubbo%3A%2F%2F10.137.32.54%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4264%26side%3Dprovider%26timestamp%3D1546848704035
最后还有一个地方需要注意下:这里调用 zkClient.create() 方法时,如果 dynamic 为空,默认会创建 zookeeper 临时节点。临时节点的好处在于如果客户端和 zookeeper 集群断开连接,对应的临时节点则会自动被删除。这样一来,是不是对我们的调用方好处多多呢?

End

碍于篇幅限制,今天就先介绍这么多。回顾一下,我们在 RegistryProtocol.export() 方法里面创建了一个 DubboExporter 对象、开启了 Netty 服务端,同时还往注册中心 zookeeper 上创建了一个和服务有关的临时节点!关于 RegistryProtocol.export() 方法剩余的内容,我们以后有机会再说吧!

本BLOG上原创文章未经本人许可,不得用于商业用途及传统媒体。网络媒体转载请注明出处,否则属于侵权行为。https://juejin.im/post/5c2dd31be51d451ffd25892d

原文地址:https://www.cnblogs.com/cfyrwang/p/10312705.html

时间: 2024-07-31 12:07:01

【Dubbo源码阅读系列】服务暴露之远程暴露的相关文章

【Dubbo源码阅读系列】之远程服务调用(上)

今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道远程服务具体的实现,消费者和提供者通过代理类来进行交互!! 一.JAVA 动态代理 简单看一段代码回顾一下动态代理: public class MyInvocationHandler implements InvocationHandler{ private Object object; publi

Dubbo源码分析系列-服务的发布

RPC简化类图 RPC模块核心接口和抽象实现 默认实现Dubbo协议的接口和抽象实现 服务发布过程 调用过程 上图是服务提供者暴露服务的主过程: 首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),然后通过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化.接下来就是Invoker转换到Exporter的过程. Dubbo处理服务暴露的关键

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line:

dubbo源码阅读之服务导出

dubbo服务导出 常见的使用dubbo的方式就是通过spring配置文件进行配置.例如下面这样 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns

Spring源码阅读系列总结

最近一段时间,粗略的查看了一下Spring源码,对Spring的两大核心和Spring的组件有了更深入的了解.同时在学习Spring源码时,得了解一些设计模式,不然阅读源码还是有一定难度的,所以一些重要的设计模式简单的做了阐述.同时还会简单的加入一些GOF中提到的设计原则.Spring的源码阅读系列,也暂告一段落.下面是就带你走进Spring世界: Spring系列的引子 1)Spring WebApplicationContext初始化与消亡 这一节帮我们了解Spring是如何初始化WebAp

Dubbo源码学习之-服务导出

前言 忙的时候,会埋怨学习的时间太少,缺少个人的空间,于是会争分夺秒的工作.学习.而一旦繁忙的时候过去,有时间了之后,整个人又会不自觉的陷入一种懒散的状态中,时间也显得不那么重要了,随便就可以浪费掉几个小时.可见普通人的学习之路要主动地去克服掉很多阻碍,最主要的阻碍还是来自于自身,周期性的不想学习.不自觉的懒散.浅尝辄止的态度.好高骛远贪多的盲目...哎,学习之路,还是要时刻提醒自己,需勤勉致知. 闲话少叙,今天的学习目标是要尽量的了解清楚Dubbo框架中的服务导出功能,先附上Dubbo官网上的

dubbo源码阅读-阅读前的准备(一)

说明 之前自己看了一篇dubbo源码,但是对整体还是没有清晰的了解所以重新跟着别人的博文整理一遍 获取源码 github代码fork 1.打开https://github.com/alibaba/dubbo fork到自己仓库,为了后续自己看的过程中会写一些自己的注释 从自己仓库将项目拉取到本地 模块划分 项目结构一览 官方dubbo框架设计文档地址 dubbo-common 提供 Util 类和通用模型 dubbo-remoting 远程通信模块:提供通用的客户端和服务端的通讯功能. 原文地址

源码阅读系列:源码阅读方法

一.前提条件 1.纯熟扎实的语言基础 ??如果你学java,却对反射.泛型.注解一直半解,还是不要去读什么框架了,回去把java基础打扎实反而对你自身更有益. 2.UML能力 ??在软件工程中,UML在软件的不同生命周期阶段扮演着非常重要的角色,没有好的UML水平,面对大型的项目源码会束手无策. 3.对业务的理解 ??如果你要阅读的项目业务性比较强,事先对业务有一定的了解是必须的. 4.设计模式.重构的掌握 ??编程语言什么的没什么好说.着重提一个:设计模式由于Android源代码用到各种各样的

dubbo源码分析8——服务暴露概述

从上文中可知,com.alibaba.dubbo.config.spring.ServiceBean类是负责解析<dubbo:service/>的配置的,下面是它的类图 从类图上可知它继承了ServiceConfig类,并实现了5个接口,在这5个接口中有两个接口比较重要,其中InitializingBean是进行bean的初始化工作的,ApplicationListener接口是监听spring容器事件的.先通过outline视图观察一下ServiceBean类:    从outline视图中