dubbo源码福彩快三平台搭建分析22 -- consumer 发送与接收原理

在前面福彩快三平台搭建论坛:haozbbs.com Q1446595067的文章中,我们分析了 dubbo 从 provider 进行服务暴露,然后把服务信息注册到注册中心上面解耦 consumer 与 provider 的调用。consumer 通过 javassist 创建代理对象引用远程服务。当通过代理对象调用远程服务的时候,讲到进行真正调用的时候 dubbo 抽象出集群容错(Cluster、Directory、Router、LoadBalance)从服务多个暴露方选取出一个合适的 Invoke 来进行调用。 dubbo 默认是通过 FailoverClusterInvoker 从多个 Invoke 中选择出一个 Invoke 实例 InvokerWrapper 来进行远程调用。本次分析主要包括以下 4 个部分:

consumer 发送扩展
consumer 发送原理
consumer 接收原理
dubbo 异步变同步
1、consumer 发送扩展
我们先来看一下 dubbo 中 consumer 端的请求发送原理,也就是从 InvokerWrapper#invoke 开始,在 consumer 服务引用分析的时候,我们知道根据 Invoke 调用的时候, dubbo 会创建 ProtocolListenerWrapper与 ProtocolFilterWrapper 来用集成框架使用者的扩展包含:InvokerListener 与 Filter。ProtocolListenerWrapper 在对象创建的时候就会调用InvokerListener#referred扩展,所以在远程服务调用的时候最主要的还是 Filter 扩展,下面我们就看一下在远程调用的时候默认包括哪些 Filter 扩展:

ConsumerContextFilter
FutureFilter
MonitorFilter
1.1 ConsumerContextFilter
ConsumerContextFilter 保存客户端信息到 RpcContext。

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(NetUtils.getLocalHost(), 0)
            .setRemoteAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        return invoker.invoke(invocation);
    } finally {
        RpcContext.getContext().clearAttachments();
    }
}

}
RpcContext 使用 ThreadLocal 来记录一个临时状态。当接收到 RPC 请求,或发起 RPC请求时,RpcContext 的状态都会变化。

比如:A 调 B,B 再调 C,则 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。

可以通过 RpcContext 上的 setAttachment 和 getAttachment 在服务消费方和提供方之间进行参数的隐式传递。

1.2 FutureFilter
FutureFilter 会来处理 dubbo 服务接口调用方配置 async="true" 来使用同步调用来是异步调用。

public class FutureFilter implements Filter {

protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

    fireInvokeCallback(invoker, invocation);
    //需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.
    Result result = invoker.invoke(invocation);
    if (isAsync) {
        asyncCallback(invoker, invocation);
    } else {
        syncCallback(invoker, invocation, result);
    }
    return result;
}

}
同步调用 dubbo 就会同步的返回 provider 方法调用返回的响应.如果是异步调用在进行调用的时候就会把请求信息发送到 provider 然后返回一个空的 RpcResult。consumer 端如果要获取响应需要通过以下方法获取:

// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 同理等待bar返回
Bar bar = barFuture.get();
1.3 MonitorFilter
MonitorFilter 其实是在分析之前 dubbo monitor 的时候就进行了详细的分析。它主要是通过以下配置来激活 provider 与 consumer 端的指标监控。

<dubbo:monitor protocol="registry" />
我们还是简单的来看一下它的源码:

public class MonitorFilter implements Filter {

private MonitorFactory monitorFactory;

public void setMonitorFactory(MonitorFactory monitorFactory) {
    this.monitorFactory = monitorFactory;
}

// 调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
        RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
        String remoteHost = context.getRemoteHost();
        long start = System.currentTimeMillis(); // 记录起始时间戮
        getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
        try {
            Result result = invoker.invoke(invocation); // 让调用链往下执行
            collect(invoker, invocation, result, remoteHost, start, false);
            return result;
        } catch (RpcException e) {
            collect(invoker, invocation, null, remoteHost, start, true);
            throw e;
        } finally {
            getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
        }
    } else {
        return invoker.invoke(invocation);
    }
}

}
当启动 dubbo monitor 的时候会暴露一个远程服务 MonitorService 接口服务服务,具体的处理类是 SimpleMonitorService。而在 MonitorFilter#collect 方法里面 MonitorFactory 会创建一个 Monitor 接口实例(继承于 MonitorService)。其实就是 DubboMonitorFactroy#createMonitor 远程引用 dubbo monitor 暴露的 MonitorService 服务。

public class DubboMonitorFactroy extends AbstractMonitorFactory {

private Protocol protocol;

private ProxyFactory proxyFactory;

public void setProtocol(Protocol protocol) {
    this.protocol = protocol;
}

public void setProxyFactory(ProxyFactory proxyFactory) {
    this.proxyFactory = proxyFactory;
}

@Override
protected Monitor createMonitor(URL url) {
    url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));
    if (url.getPath() == null || url.getPath().length() == 0) {
        url = url.setPath(MonitorService.class.getName());
    }
    String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);
    if (filter == null || filter.length() == 0) {
        filter = "";
    } else {
        filter = filter + ",";
    }
    url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),
            Constants.REFERENCE_FILTER_KEY, filter + "-monitor");
    Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url);
    MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
    return new DubboMonitor(monitorInvoker, monitorService);
}

}
获取到远程服务 SimpleMonitorService,最后在 MonitorFilter#collect 调用 MonitorService#collect 进行监控数据采集提供给 dubbo monitor。调用过程如下所示:

consumer 发送扩展.jpg

2、consumer 发送原理
最终 consumer 会到 DubboInvoke 进行服务调用。它会在 AbstractInvoker#invoke 添加一些扩展参数到 RpcInvocation这个远程调用对象里面。添加的扩展参数包含:

interface : 远程调用的接口名称
group : 接口分组名称
token : 调用的 token 信息
timeout : 调用服务的超时时间
async : 是否异步调用
id : 异步操作默认添加 invocation id,用于保证操作幂等
以及 RpcContext 传递过来的扩展参数(RpcContext#attachments)。然后在 DubboInvoker#doInvoke 中会添加 path (接口全类名) 以及 version(版本信息)。再根据 dubbo 的调用模式进行远程调用,包含以下三种调用模式:

oneway 模式:<dubbo:method>标签的 return 属性配置为false,则是oneway模式,利用ExchangeClient 对象向服务端发送请求消息之后,立即返回空 RpcResult 对象
异步模式:<dubbo:method>标签的 async 属性配置为 ture,则是异步模式,直接返回空 RpcResult对象,由 FutureFilter 和 DefaultFuture 完成异步处理工作
同步模式:默认即是同步,则发送请求之后线程进入等待状态,直到收到服务端的响应消息或者超时。
下面我们看一下 dubbo 同步调用时序图:

DubboInvoke.png

ChannelFuture future = channel.write(message);
最终是调用 org.jboss.netty.channel.Channel 通过 socket 发送消息到从集群中选择出的一个暴露服务信息的服务器发送网络数据。

3、consumer 接收原理
我们都知道 dubbo 其实是通过 netty 来进行 socket 通信的。而在使用 netty 进行网络编程的时候,其实核心就是就是实现 ChannelHandler。而在 dubbo 中对应的实现类就是 NettyHandler(高版本支持支持 netty 4 使用的是 NettyClientHandler ,NettyHandler 使用的是 netty 3.x)。如果在 consumer 端(provider 也支持)需要使用 netty 4 进行业务处理,需要进行进行以下配置:

<dubbo:consumer client="netty4" />
所以 consumer 接收 provider 响应的入口就在 NettyClientHandler#channelRead:

NettyClientHandler.jpg

首先 ChannelHandler 用于接收 provider 端响应回来的请求,然后经过 5 个 dubbo 自定义的 ChannelHandler。

MultiMessageHandler:支持 MultiMessage 消息处理,也就是多条消息处理。
HeartbeatHandler:netty 心条检测。如果心跳请求,发送心跳然后直接 return,如果是心跳响应直接 return。
AllChannelHandler:使用线程池通过 ChannelEventRunnable 工作类来处理网络事件。
DecodeHandler:解码 message,解析成 dubbo 中的 Response 对象
HeaderExchangeHandler:处理解析后的 provider 端返回的 Response 响应信息,把响应结果赋值到 DefaultFuture 响应获取阻塞对象中。
4、dubbo 异步变同步
我们都知道 dubbo 是基于 netty NIO 的非阻塞 并行调用通信。所以 dubbo 在 consumer 请求 provider 后响应都是异步的。但是在 dubbo 里面默认是同步返回的,那么 dubbo 是如何把异步响应变成同步请求的呢?带着这个问题,首先我们来看一下 dubbo 里面的几种请求方式。

4.1 异步且无返回值
这种请求最简单,consumer 把请求信息发送给 provider 就行了。只是需要在 consumer 端把请求方式配置成异步请求就好了。如下:

<dubbo:method name="sayHello" return="false"></dubbo:method>
4.2 异步且有返回值
这种情况下 consumer 首先把请求信息发送给 provider 。这个时候在 consumer 端不仅把请求方式配置成异步,并且需要 RpcContext 这个 ThreadLocal 对象获取到 Future 对象,然后通过 Future#get() 阻塞式获取到 provider 的响应。那么这个 Future 是如果添加到 RpcContext 中呢?

在第二小节讲服务发送的时候, 在 DubboInvoke 里面有三种调用方式,之前只具体请求了同步请求的发送方式而且没有异步请求的发送。异步请求发送代码如下:

DubboInvoker#doInvoke 中的 else if (isAsync) 分支

ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<T> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (RpcUtils.isAsyncFuture(getUrl(), inv)) {
    result = new AsyncRpcResult<>(futureAdapter);
} else {
    result = new RpcResult();
}
return result;

上面的代码逻辑是直接发送请求到 provider 返回一个 ResponseFuture 实例,然后把这个 Future 对象保存到 RpcContext#LOCAL 这个 ThreadLocal 当前线程对象当中,并且返回一个空的 RpcResult对象。如果要获取到 provider响应的信息,需要进行以下操作:

// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<String> temp= RpcContext.getContext().getFuture();
// 同理等待bar返回
hello=temp.get();
4.3 异步变同步(默认)
下面我们就来讨论一下 dubbo 是如何把异步请求转化成同步请求的。其实原理和异步请求的通过 Future#get 等待 provider 响应返回一样,只不过异步有返回值是显示调用而默认是 dubbo 内部把这步完成了。下面我们就来分析一下 dubbo 是如何把 netty 的异步响应变成同步返回的。(当前线程怎么让它 "暂停",等结果回来后,再执行?)

我们都知道在 consumer 发送请求的时候会调用 HeaderExchangeChannel#request 方法:

HeaderExchangeChannel#request

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

它首先会通过 dubbo 自定义的 Channel、Request 与 timeout(int) 构造一个 DefaultFuture 对象。然后再通过 NettyChannel 发送请求到 provider,最后返回这个 DefaultFuture。下面我们来看一下通过构造方法是如何创建 DefaultFuture 的。我只把主要涉及到的属性展示出来:

public class DefaultFuture implements ResponseFuture {

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

private final long id;
private final Channel channel;
private final Request request;
private final int timeout;

public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

}
这个 id 是在创建 Request 的时候使用 AtomicLong#getAndIncrement 生成的。从 1 开始并且如果它一直增加直到生成负数也能保证这台机器这个值是唯一的,且不冲突的。符合唯一主键原则。 dubbo 默认同步变异步其实和异步调用一样,也是在 DubboInvoker#doInvoke 实现的。

DubboInvoker#doInvoke

RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();

关键就在 ResponseFuture#get 方法上面,下面我们来看一下这个方法的源码:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (!isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (!isDone()) {
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (!isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

其实就是 while 循环,利用 java 的 lock 机制判断如果在超时时间范围内 DefaultFuture#response 如果赋值成不为空就返回响应,否则抛出 TimeoutException 异常。下面我们就来看一下 DefaultFuture#response 是如何被赋值的。

还记得 consumer 接收 provider 响应的最后一步吗?就是 DefaultFuture#received,在 provider 端会带回 consumer请求的 id。我们来看一下它的具体处理逻辑:

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

它会从最开始通过构造函数传进去的 DefaultFuture#FUTURES 根据请求的 id 拿到 DefaultFuture ,然后根据这个 DefaultFuture 调用 DefaultFuture#doReceived 方法。通过 Java 里面的 lock 机制把 provider 的值赋值给 DefaultFuture#response。此时 consumer 也正在调用 DefaultFuture#get 方法进行阻塞,当这个 DefaultFuture#response 被赋值后,它的值就不为空。阻塞操作完成,且根据请求号的 id 把 consumer 端的 Request以及 Provider 端返回的 Response 关联了起来。

这个就是 Dubbo 异步转同步的原理,是不是很巧妙,很简单。 :)

原文地址:http://blog.51cto.com/13855531/2137026

时间: 2024-10-03 21:25:25

dubbo源码福彩快三平台搭建分析22 -- consumer 发送与接收原理的相关文章

使用saltstack部署极速快三平台搭建

极速快三平台搭建搭建环境: 1.服务器列表: 企 娥:217 1793 408salt-master: 192.168.42.131 salt-minion: 192.168.42.128(minion-192.168.42.128) 192.168.42.130(minion-192.168.42.130) 2.环境: (1)服务器之间互相可以通信(火墙关闭或者火墙允许通信) (2)master对minion的主机名和ip做好解析 部署步骤 1.确定整个配置的目录文件结构: pcre软件安装

01MBR分区幸运快三平台搭建结构、DPT分区表、EBR扩展引导

主引导幸运快三平台搭建论坛:haozbbs.com Q1446595067 记录(Master Boot Record,缩写:MBR),又叫做主引导扇区,是计算机开机后访问硬盘时所必须要读取的首个扇区,它在硬盘上的三维地址为(0柱面,0磁头,1扇区).在深入讨论主引导扇区内部结构的时候,有时也将其开头的446字节内容特指为"主引导记录"(MBR),其后是4个16字节的"磁盘分区表"(DPT),以及2字节的结束标志(55AA).因此,在使用"主引导记录&qu

Yolo系列学习1-Yolov3训练福彩快三平台出租自己的数据

目的:福彩快三平台出租haozbbs.comQ1446595067 实现利用yolov3训练自己的数据集(voc格式) 方法: 1)构建VOC数据集 将你手中的数据集的标注txt修改成voc格式的txt,voc格式如下: 000002.jpg car 44 28 132 121 000003.jpg car 54 19 243 178 000004.jpg car 168 6 298 164 其中第一列为图片名,第二列为目标类别,最后是目标的包围框坐标(左上角和右下角坐标). 批量修改文件名py

IIS下PHP快三平台源码的架设配置环境要求

IIS下PHP快三平台源码的架设配置环境要求 最近在Windows Server 2003 R2企业版下设置支持PHP的WEB页面,以前有设定过Windows下的Apache和PHP(详见http://aqiulian.com).但这次因为安装包大小的原因只让IIS支持PHP(如有不懂的可以企鹅:212303635),具体步骤详细如下. 一.安装PHP 首先在阿酋联源码论坛上下载针对Windows的PHP安装包.随后将该包解压至C:\PHP如下图. 完成上面的步骤后,将C:\php目录下的php

ArrayList.add() 方法吉林快-三平台出租源码解析

吉林快-三平台出租Q1446595067解析源码的方法>>> list.add("hello"); 该方法涉及到的方法被我复制到了一个类中,至于解释,全在注释上.初次解析,别喷我!!! 如有不足望评论,随时补充. package com.nc.sourceCode; import java.lang.reflect.Array;import java.util.ArrayList;import java.util.Arrays;import java.util.Lis

吉林快_三平台搭建项目实践教程

吉林快_三平台搭建是一个开源框架,Spring是于2003 年兴起的一个轻量级的Java 开发框架,由Rod Johnson 在其著作Expert One-On-One J2EE Development and Design中阐述的部分理念和原型衍生而来.它是为了解决企业应用开发的复杂性而创建的.Spring使用基本的JavaBean来完成以前只可能由EJB完成的事情.然而,Spring的用途不仅限于服务器端的开发.从简单性.可测试性和松耦合的角度而言,任何Java应用都可以从Spring中受益

深入快三平台北京赛车平台出租/搭建浅出学Vue开发

Vue对移动端和浏览器版本的支持 因为Vue本身使用快三平台北京赛车平台出租/搭建Q1619668668浅了ECMAScript 5 特性,所以Vue支持所有兼容ECMAScript 5 的浏览器.我们根据下面的图示来解释一下, 红色表示为几乎不支持 黄色表示为大部分支持 (有很小的可能会影响使用) 绿色表示为几乎全部支持(不影响使用) 由上面的图示可以看出来对IE来说,在IE8及其以下的时候,是不支持ECMAScript 5 的,也就是说,是无法使用Vue的.我们看一下对IE8的描述. IE8

Android源码浅析(三)——Android AOSP 5.1.1源码的同步sync和编译make,搭建Samba服务器进行更便捷的烧录刷机

Android源码浅析(三)--Android AOSP 5.1.1源码的同步sync和编译make,搭建Samba服务器进行更便捷的烧录刷机 最近比较忙,而且又要维护自己的博客,视频和公众号,也就没仔细的梳理源码的入门逻辑,今天也就来讲一个源码的玩法,各位看官,一起学习学习! 看本篇博客之前,先看下我的前面两篇 Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置 Android源码浅析(二)--Ubuntu Roo

Linux系统下吉林快-三平台开发环境搭建以及关于变量的基本操作

有问题联系Q1446595067吉林快-三平台开发, 是一种面向对象的解释型计算机程序设计语言,由荷兰人Guido van Rossum于1989年发明,第一个公开发行版发行于1991年. Python是纯粹的自由软件, 源代码和解释器CPython遵循 GPL(GNU General Public License)协议.Python语法简洁清晰,特色之一是强制用空白符(white space)作为语句缩进. Python具有丰富和强大的库.它常被昵称为胶水语言,能够把用其他语言制作的各种模块(