dubbo中的切换不同transport怎么做到的?处理消息的handler链是咋样的?在哪里唤醒之前阻塞在发送request以后的业务线程?

server和client都是以下方法得到的,Exchanger这个接口只有这么一个实现,将来可能其他更加复杂获得server和cliet方式,以下这种是目前唯一的

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

  


connect和 bind得到的最终的server和client,Transporters.connect调用方法是以下两个:


public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

  

通过spi的自适应扩展作为生产实例的中间工厂,这个工厂根据url参数得到不同的transport,如果url里面指定netty4,那么就可以得到netty4的client切换不同transport怎么做到的?那就是通过自适应扩展加url自由切换。

回到最上面的,通过bind已经拿到最终的nettyserver,继续包裹了一层HeaderExchangeServer,这里面主要处理心跳、channel、future的封装,以及屏蔽不同类型的server(netty、netty4等等)。

return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
 继续看这里,这个handler很长,对于dubbo来说,最里面这个handler是通过CreateServer方法中的server = Exchangers.bind(url, requestHandler)这个传入进来的,
这个requestHandler就是dubboprotocl里面内部的,这个handler只有reply方法,作用就是执行doinvoe,也就是真正履行provider义务的地方,也只会在request来的时候才会被调用。
HeaderExchangeHandler用来处理request、response的,decodehandler用来解码。这handler到这里是不是已经结束了?显然不是,netty4server初始化的时候:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

  


日志里面经常看到的 “DubboServerHandler”就是这个SERVER_THREAD_POOL_NAME。handler链上又有下面这几个:
MultiMessageHandler:针对multimessage类型消息,在received做拦截。
HeartbeatHandler:针对心跳在received做拦截。通过dispatch出来的默认的allchannelhandler:对所有io事件做处理,大部分任务都扔到线程池里面做异步处理,防止阻塞netty线程。这个业务线程池的类型、个数也是url指定。

所以这个handler链路上面从外到内:MultiMessageHandler HeartbeatHandler Allchannelhandler DecodeHandler HeadExchangeHandler dubboprotocol里面自带的带有reply方法的handler。

对于netty来说,消息在到达这些handler处理以前,已经被netty的编解码handler处理了,所以DecodeHandler可有可无
HeadExchangeHandler看起来意义不大,其实这个是用来阻断传递到dubbo里面的handler的,它是最后一道防线,用来决定要不要丢给dubbo做reply、doinvoke操作的,最重要的方法:
public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

  

如果two-way、对方给我request,需要返回response,那么handleRequest会调用dubboprotocol的reply处理这个request
如果对方给我的response,那么调用handleResponse(channel, (Response) message);
static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

  

public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

 

 收到response以后,在这里找到对应的future,通过future唤醒之前阻塞在发送request以后的业务线程:

private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

  




原文地址:https://www.cnblogs.com/notlate/p/10205043.html

时间: 2024-10-04 04:21:10

dubbo中的切换不同transport怎么做到的?处理消息的handler链是咋样的?在哪里唤醒之前阻塞在发送request以后的业务线程?的相关文章

【Rest】在Dubbo中开发REST风格的远程调用(RESTful Remoting)

目录 概述 REST的优点 应用场景 快速入门 标准Java REST API:JAX-RS简介 REST服务提供端详解 HTTP POST/GET的实现 Annotation放在接口类还是实现类 JSON.XML等多数据格式的支持 中文字符支持 XML数据格式的额外要求 定制序列化 配置REST Server的实现 获取上下文(Context)信息 配置端口号和Context Path 配置线程数和IO线程数 配置长连接 配置最大的HTTP连接数 配置每个消费端的超时时间和HTTP连接数 GZ

Dubbo中集群Cluster,负载均衡,容错,路由解析

Dubbo中的Cluster可以将多个服务提供方伪装成一个提供方,具体也就是将Directory中的多个Invoker伪装成一个Invoker,在伪装的过程中包含了容错的处理,负载均衡的处理和路由的处理.这篇文章介绍下集群相关的东西,开始先对着文档解释下容错模式,负载均衡,路由等概念,然后解析下源码的处理.(稍微有点乱,心情不太好,不适合分析源码.) 集群的容错模式 Failover Cluster 这是dubbo中默认的集群容错模式 失败自动切换,当出现失败,重试其它服务器. 通常用于读操作,

Android中Activity切换时共享视图元素的切换动画(4.x兼容方案)

同时发布在我的博客 点此进入 开始 上一篇讲了使用 Google 的 AppCompat-v7 来实现 Activity 切换时实现共享视图元素的切换动画.这一篇介绍两个可以兼容 4.x 的两个第三方方案. 上一篇:Android中Activity切换时共享视图元素的切换动画(5.0以上) 方案一:PreLollipopTransition 首先在 build.gradle 配置文件添加这个库依赖 dependencies { compile 'com.kogitune:pre-lollipop

Debian中如何切换默认Python版本

当你安装 Debian Linux 时,安装过程有可能同时为你提供多个可用的 Python 版本,因此系统中会存在多个 Python 的可执行二进制文件,你可以按照以下方法使用 ls 命令来查看你的系统中都有那些 Python 的二进制文件可供使用: $ ls /usr/bin/python* /usr/bin/python /usr/bin/python2 /usr/bin/python2.7 /usr/bin/python3 /usr/bin/python3.4 /usr/bin/pytho

Dubbo中SPI扩展机制解析

dubbo的SPI机制类似与Java的SPI,Java的SPI会一次性的实例化所有扩展点的实现,有点显得浪费资源. dubbo的扩展机制可以方便的获取某一个想要的扩展实现,每个实现都有自己的name,可以通过name找到具体的实现. 每个扩展点都有一个@Adaptive实例,用来注入到依赖这个扩展点的某些类中,运行时通过url参数去动态判断最终选择哪个Extension实例用. dubbo的SPI扩展机制增加了对扩展点自动装配(类似IOC)和自动包装(类似AOP)的支持. 标注了@Activat

Dubbo中服务消费者和服务提供者之间的请求和响应过程

服务提供者初始化完成之后,对外暴露Exporter.服务消费者初始化完成之后,得到的是Proxy代理,方法调用的时候就是调用代理. 服务消费者经过初始化之后,得到的是一个动态代理类,InvokerInvocationHandler,包含MockClusterInvoker,MockClusterInvoker包含一个RegistryDirectory和FailoverClusterInvoker. Java动态代理,每一个动态代理类都必须要实现InvocationHandler这个接口,并且每一

Dubbo中编码和解码的解析

(这里做的解析不是很详细,等到走完整个流程再来解析)Dubbo中编解码的工作由Codec2接口的实现来处理,回想一下第一次接触到Codec2相关的内容是在服务端暴露服务的时候,根据具体的协议去暴露服务的步骤中,在DubboProtocol的createServer方法中: 1234567891011 private ExchangeServer createServer(URL url) { ... //这里url会添加codec=dubbo url = url.addParameter(Con

Dubbo中暴露服务的过程解析

dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay="5000"),另外一种是没有设置延迟暴露或者延迟设置为-1(delay="-1"): 设置了延迟暴露,dubbo在Spring实例化bean(initializeBean)的时候会对实现了InitializingBean的类进行回调,回调方法是afterPropertySet(),如果设置了延迟暴露,dubbo在这个方法中进行服务的发布. 没有设置延迟或者延迟为-1,dubbo会在Spring实例

Dubbo中订阅和通知解析

Dubbo中关于服务的订阅和通知主要发生在服务提供方暴露服务的过程和服务消费方初始化时候引用服务的过程中. 服务引用过程中的订阅和通知 在服务消费者初始化的过程中,会有一步是进行服务的引用,具体的代码是在RegistryProtocol的refer方法: 12345678910111213141516171819 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url =