由Router引入
由于业务上的需要,很早之前就想自己扩展Dubbo的路由功能。But,之前试了一下,发现好像并没有找到正确的打开方式,而且由于时间不是很充足,加上自己心浮气躁浅尝辄止,于是乎就放弃了这块的研究。
不过今日,在手头上一块比较忙的事情告一段落了之后,又开始漫漫的research之路。。
好了,下面先来说说之前被困在了什么地方。
首先吐槽一下,官方文档对于这一块的用法写得非常的不全面,只介绍了条件路由的语法,而没有告诉我们怎么样使其生效。当然最简单的一种做法,官方文档上也说了,装个dubbo-admin
在其中设置就好了。但是如果是想自定义扩展的话呢?这个时候我们来看看Developer Guide是怎么写的:
没错,就是这里的 (3)扩展配置 误导了我,导致了之前的放弃。因为这里的扩展配置,完全就是错的。今天看了源码之后,发现完全就没有这么一个配置项,关于正确的路由规则的设置方法下面会讲。
好了,继续看,那我们就先看看条件路由的语法,然而,又发现一个官方文档的bug:
官方文档写的是runtime
不填缺省为false
,而Dubbo
的代码中呢?
有点儿醉!接着看吧;既然看了代码,我们就从整体上来看看Dubbo
的结构,这里从Consumer
入手,来看看它是怎么找到合适的Provider
来调用的。
源码分析
注意,下面我们会摒弃Mock
、Generic
等非主流程,精简代码来分析。
核心类ReferenceConfig
好啦,从Consumer
入手,我们看看到底发生了什么,首先,Dubbo
会代理我们在Consumer
中引用的接口,一般都用<dubbo:reference />
来配置,具体的代理逻辑在ReferenceConfig
中:
private T createProxy(Map<String, String> map) {
// 通过注册中心配置拼装URL,这里省略了inJvm、点对点直连等
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 这里的url是注册中心的url
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference");
}
// 这里省略了多个注册中心的情况
if (urls.size() == 1) {
// 我们通过注册中心生成了invoker,它代表一个可执行体,可向它发起invoke调用
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 这里生成invoker还包含了两个额外的步骤 1.consumer注册到registry 2.consumer订阅该服务下的providers/routers/configurators
// 另外这个invoker里包含了很多逻辑,比如cluster等
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service");
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
默认proxyFactory
是StubProxyFactoryWrapper
。从名字也能看出来,这个类里实现了客户端Stub
的逻辑。不过这次我们就先跳过这一段,以后再来看。最终代理的实现无非还是JDK
的动态代理或是Javassist
的代理,我们比较关心InvocationHandler
——InvokerInvocationHandler
:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
其中RpcInvocation
是会话域,它持有调用过程中的变量,比如方法名,参数等。
然后我们关注一下其中的成员变量invoker
,最终的调用逻辑都是由invoker
去执行。我们来看看这个invoker
究竟是什么:
可以看到这里的invoker
有点像装饰者模式,一层层的添加功能。需要注意的是,上述截图中包含MergeableClusterInvoker
是因为我在reference
上配置了group=*
,而一般的流程则只会走:MockClusterInvoker
-> FailoverClusterInvoker
:其中 FailoverClusterInvoker
继承了 AbstractClusterInvoker
,这个抽象类实现了 routers
的过滤 以及 loadBalance
的过滤,最终会选取一个唯一的Provider
来进行调用:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
// 这里的list方法也是根据directory的list方法进行过滤,也就是走routers进行过滤
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
核心类RegistryDirectory
这里提到了routers
就不得不提到一个类——RegistryDirectory
:每个ReferenceBean
都会对应一个RegistryDirectory
,其中维护了几乎所有关于该服务接口的配置:比如configurators
、routers
、providers
等。
这个类实现了NotifyListener
接口,显然每次当这些配置项有变化时都需要通知这个类做refresh
。
其中每次刷新都会刷新这些配置项,并且还有一些缓存,比如:某个方法可调用的Provider
的对应关系(methodInvokerMap
),简单看看这个notify
方法:
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() >0 ){
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
这里会去设置routers
、configurators
等成员变量,然后在refereshInvoker
方法中,重新构建各种缓存。
我们可以考虑一下该notify
的触发点,肯定是当routers
、configurators
等信息有变化时才会触发。而这些信息都是存储在我们注册中心当中的,不得不提到Dubbo
中最常见的注册中心——ZooKeeperRegistry
Registry
关于注册中心,在上面的分析中也提到过一次,不知道大家有没有注意到。就是在ReferenceConfig
中createProxy
方法中,有一步:
// 这里省略了多个注册中心的情况
if (urls.size() == 1) {
// 我们通过注册中心生成了invoker,它代表一个可执行体,可向它发起invoke调用
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 这里生成invoker还包含了两个额外的步骤 1.consumer注册到registry 2.consumer订阅该服务下的providers/routers/configurators
}
这里我们可以深入再看看,这里我们用RegistryProtocol
来做分析:
// 这里两个参数一个是服务类型,另一个是注册中心的url
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// Registry是注册中心的抽象,比如ZooKeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 这里关于group一个和group多个会有不同的调用分支
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
return doRefer(cluster, registry, type, url);
}
在doRefer
方法中会生成Consumer
的URL,并且将其注册到注册中心:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 注册到注册中心
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅该service下的 providers,configurators,routers
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
注册之后,就是消费者订阅,其中订阅了,一般都用ZK来做注册中心,利用ZK的watcher来做更新通知
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 省略部分代码
} else {
List<URL> urls = new ArrayList<URL>();
// 这里toCategoriesPath(url)返回的是一个该服务下routers等zk路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
// 这里会监听节点的变化
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 这里会返回该zk路径下的所有数据,比如provider的话,就返回多个provider的URL
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 如果该path下的数据为空,那么返回empty://协议开头,暂时不知是何用意
// 注意:这里已经根据group等过滤过了
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 这里的listener也就是RegistryDirectory
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
订阅之后再执行一下notify
,作为初始notify
,之后每一次节点有变更,都会触发notify
,而notify
里的逻辑就是通知RegistryDirectory
这个实例的内部配置的更新。其中第一个url
是consumer
的url
,而第三个参数是更新的URL
列表,注意,下面的notify
方法是Registry
中的notify
,其中参数listener
才是RegistryDirectory
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.size() == 0)
&& ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
这里的url
和listener
应该是一一对应的,url
代表一个consumer
端的reference
,而listener
则代表该reference
的RegistryDirectory
,然后urls
则代表多个配置项,这些配置项会一个个应用到RegistryDirectory
上去。这里的urls
根据category
可以分成三类:routers
、configurators
、providers
。并且每个category
只会执行一次notify
,之后每次这三个category
的值有变更,都会进行通知,然后执行notify
方法。
我们还可以来看看UrlUtils.isMatch(url, u)
这个方法:
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
String consumerInterface = consumerUrl.getServiceInterface();
String providerInterface = providerUrl.getServiceInterface();
if( ! (Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface)) ) return false;
if (! isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {
return false;
}
if (! providerUrl.getParameter(Constants.ENABLED_KEY, true)
&& ! Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
return false;
}
String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}
上面这段代码实现了group
、version
等之间的隔离逻辑。
OK,总算看完了一整个从Consumer
调用远程服务的过程,具体的通信细节这里就先不展开讨论了(已经累得半死了)。
重回Routers
前面在分析源码时提到了RegistryDirectory
,路由逻辑就是在这个类中完成的:
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed){
throw new RpcException("Directory already destroyed .url: "+ getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router: localRouters){
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
我们来细看一下上面的代码:
第一步——doList
,这一步实际上是直接获取 方法和Provider
的List
的缓存关系
第二步——很关键,这里需要提到一个参数,就是这个runtime
。每一条route
都会有一个runtime
这个属性,如果runtime
配置为true
,那么客户端会在每次调用服务端方法时都进行route
一下,因为比如说根据参数来进行route
,当然这对性能会有一定的影响,但是比如条件路由的话,也就正则表达式的判断,性能影响应该不是特别大。而runtime
配置为false
的话,就可以进行缓存,也就是在第一次调用时确定下来,后面只有当route
规则进行变更了之后才会更新该缓存,也就是能直接在第一步中获得。这里又要吐槽一下官方文档了,童鞋们还记得前面的梗么。。。
再一次,继续吐槽,Router
中有一个BUG,ConditionRouter
中有两个核心方法,一个是matchWhen
,另一个是MatchThen
。也就是先判断Consumer
是否满足matchWhen
,若满足,则路由到满足matchThen
条件的Provider
其中matchWhen
方法:
private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) {
Map<String, String> sample = url.toMap();
for (Map.Entry<String, String> entry : sample.entrySet()) {
String key = entry.getKey();
MatchPair pair = condition.get(key);
if (pair != null && ! pair.isMatch(entry.getValue(), param)) {
return false;
}
}
return true;
}
而这个url.toMap();
方法,得到的是Consumer
的parameters
的Map
集合。我们可以通过注册中心观察到Consumer
的URL
上是没有method
的参数的,类似如下:
consumer://192.168.1.88/com.alibaba.dubbo.examples.merge.api.MergeService?application=merge-consumer&category=providers,configurators,routers&dubbo=2.0.0&interface=com.alibaba.dubbo.examples.merge.api.MergeService&&pid=11020&side=consumer×tamp=1466450710697
所以返回的map
里是没有method
这个key的,取而代之的是methods
,而methods
的value是所有的方法名,比如:methods=mergeResult,testRouter
。
而官网上给我们的条件路由的配置里面有如下图所示,我又醉了。。。。
另外还有两点需要注意
1. 如果有多条routers
规则,那么会根据每一条routers
来过滤出可调用的provider
列表
2. 针对条件路由,当经过某条路由规则路由后,没有一个符合规则的Provider
,那么此次路由失败,会直接返回路由本条规则前的所有Provider
,也就是相当于没有经过该路由的结果。
配置Routers
边看边分析边写,虽然精简了不少,但是还是多而乱,不知道是否有交代清楚。最后我们以实战配置一条routers
来结束这篇文章。
场景
假设我们有多个服务分了不同的group
,然后需要根据某个参数调用不同的group
的方法
设计
先自定义一个Router
,依据原有的ConditionRouter
和ConditionRouterFactory
创建出我们的CustomRouter
和CoustomRouterFactory
,当然核心逻辑还是在CustomRouter
的route
方法中,也很简单,只是根据请求参数路由一下而已:
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
Object[] arguments = invocation.getArguments();
if(arguments == null || arguments.length == 0){
logger.error("method:" + invocation.getMethodName() + " do not have route param");
return null;
}
Object routeKey = arguments[0];
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
if ((getGroup(routeKey)).equals(group)) {
result.add(invoker);
return result;
}
}
result.add(invokers.get(0));
return result;
}
然后再让Dubbo自动发现我们添加的类:
custom=com.alibaba.dubbo.rpc.cluster.router.custom.CustomRouterFactory
最后再配置路由规则:
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class)
.getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
registry.register(URL.valueOf(
"routers://0.0.0.0/com.alibaba.dubbo.examples.merge.api.MergeService?name=test&category=routers&router=custom&dynamic=false"));
这段配置我加在了Provider
启动的时候。好了,最后模拟Consumer调用测试一下,OK
写在最后
总算写完了。已经好久没有写Blog了,接下来希望自己可以再重新坚持起来!加油