9.3 客户端接收响应信息(异步转同步的实现)

一 总体流程

客户端接收响应消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
  -->HeartbeatHandler.received(Channel channel, Object message)
    -->AllChannelHandler.received(Channel channel, Object message)
      -->ExecutorService cexecutor = getExecutorService()
      -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
        -->ChannelEventRunnable.run()
          -->DecodeHandler.received(Channel channel, Object message)
            -->decode(Object message)
            -->HeaderExchangeHandler.received(Channel channel, Object message)
              -->handleResponse(Channel channel, Response response)
                -->DefaultFuture.received(channel, response)
                  -->doReceived(Response res)//异步转同步

二 源码解析

在HeaderExchangeHandler.received(Channel channel, Object message)方法之前,与服务端接收请求消息一样,不再赘述。

HeaderExchangeHandler.received(Channel channel, Object message)

 1     public void received(Channel channel, Object message) throws RemotingException {
 2         ...
 3         try {
 4             if (message instanceof Request) {
 5                 ...
 6             } else if (message instanceof Response) {
 7                 handleResponse(channel, (Response) message);
 8             } else if (message instanceof String) {
 9                 ...
10             } else {
11                 ...
12             }
13         } finally {
14             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
15         }
16     }
17
18     static void handleResponse(Channel channel, Response response) throws RemotingException {
19         if (response != null && !response.isHeartbeat()) {
20             DefaultFuture.received(channel, response);
21         }
22     }

DefaultFuture.received(Channel channel, Response response)

 1     private final long id;
 2     private final Request request;
 3     private final int timeout;
 4     private volatile Response response;
 5     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
 6     private final Condition done = lock.newCondition();
 7
 8     public static void received(Channel channel, Response response) {
 9         try {
10             DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
11             if (future != null) {
12                 future.doReceived(response);
13             } else {
14                 logger.warn("The timeout response finally returned at "
15                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
16                         + ", response " + response
17                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
18                         + " -> " + channel.getRemoteAddress()));
19             }
20         } finally {
21             CHANNELS.remove(response.getId());
22         }
23     }
24
25     private void doReceived(Response res) {
26         lock.lock();
27         try {
28             //设置response
29             response = res;
30             if (done != null) {
31                 //唤醒阻塞的线程
32                 done.signal();
33             }
34         } finally {
35             lock.unlock();
36         }
37         if (callback != null) {
38             invokeCallback(callback);
39         }
40     }

这里比较难懂,笔者再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)

 1     public ResponseFuture request(Object request, int timeout) throws RemotingException {
 2         if (closed) {
 3             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
 4         }
 5         // create request.
 6         Request req = new Request();
 7         req.setVersion("2.0.0");
 8         req.setTwoWay(true);
 9         req.setData(request);
10         DefaultFuture future = new DefaultFuture(channel, req, timeout);
11         try {
12             channel.send(req);
13         } catch (RemotingException e) {
14             future.cancel();
15             throw e;
16         }
17         return future;
18     }

netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类,

在HeaderExchangeChannel.request(Object request, int timeout),在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。

1                   -->DubboInvoker.doInvoke(final Invocation invocation)
2                     //获取ExchangeClient进行消息的发送
3                     -->ReferenceCountExchangeClient.request(Object request, int timeout)
4                       -->HeaderExchangeClient.request(Object request, int timeout)
5                         -->HeaderExchangeChannel.request(Object request, int timeout)

DubboInvoker.doInvoke(final Invocation invocation)

 1 protected Result doInvoke(final Invocation invocation) throws Throwable {
 2         RpcInvocation inv = (RpcInvocation) invocation;
 3         final String methodName = RpcUtils.getMethodName(invocation);
 4         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
 5         inv.setAttachment(Constants.VERSION_KEY, version);
 6
 7         ExchangeClient currentClient;
 8         if (clients.length == 1) {
 9             currentClient = clients[0];
10         } else {
11             currentClient = clients[index.getAndIncrement() % clients.length];
12         }
13         try {
14             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
15             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
16             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
17             if (isOneway) {
18                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
19                 currentClient.send(inv, isSent);
20                 RpcContext.getContext().setFuture(null);
21                 return new RpcResult();
22             } else if (isAsync) {
23                 ResponseFuture future = currentClient.request(inv, timeout);
24                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
25                 return new RpcResult();
26             } else {
27                 RpcContext.getContext().setFuture(null);
28                 return (Result) currentClient.request(inv, timeout).get();
29             }
30         } catch (TimeoutException e) {
31             throw new RpcException(...);
32         } catch (RemotingException e) {
33             throw new RpcException(...);
34         }
35     }

其中currentClient.request(inv, timeout)返回值是ResponseFuture,DefaultFuture是ResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()。

 1     public Object get() throws RemotingException {
 2         return get(timeout);
 3     }
 4
 5     public Object get(int timeout) throws RemotingException {
 6         if (timeout <= 0) {
 7             timeout = Constants.DEFAULT_TIMEOUT;
 8         }
 9         if (!isDone()) {
10             long start = System.currentTimeMillis();
11             lock.lock();
12             try {
13                 while (!isDone()) {
14                     //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
15                     done.await(timeout, TimeUnit.MILLISECONDS);
16                     if (isDone() || System.currentTimeMillis() - start > timeout) {
17                         break;
18                     }
19                 }
20             } catch (InterruptedException e) {
21                 throw new RuntimeException(e);
22             } finally {
23                 lock.unlock();
24             }
25             if (!isDone()) {
26                 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
27             }
28         }
29         return returnFromResponse();
30     }
31
32     public boolean isDone() {
33         return response != null;
34     }

此处我们看到当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间到时了。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived:

会先为response赋上返回值,之后执行condition的signal唤醒被阻塞的线程,get()方法就会释放锁,执行returnFromResponse(),返回值。

 1     private Object returnFromResponse() throws RemotingException {
 2         Response res = response;
 3         if (res == null) {
 4             throw new IllegalStateException("response cannot be null");
 5         }
 6         if (res.getStatus() == Response.OK) {
 7             return res.getResult();
 8         }
 9         if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
10             throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
11         }
12         throw new RemotingException(channel, res.getErrorMessage());
13     }

到现在其实还有一个问题?就是netty时异步非阻塞的,那么假设现在我发了1w个Request,后来返回来1w个Response,那么怎么对应Request和Response呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request和Response中都有一个属性id。

在HeaderExchangeChannel.request(Object request, int timeout)中:

 1         Request req = new Request();
 2         req.setVersion("2.0.0");
 3         req.setTwoWay(true);
 4         req.setData(request);
 5         DefaultFuture future = new DefaultFuture(channel, req, timeout);
 6         try {
 7             channel.send(req);
 8         } catch (RemotingException e) {
 9             future.cancel();
10             throw e;
11         }
12         return future;

看一下Request的构造器:

 1     private static final AtomicLong INVOKE_ID = new AtomicLong(0);
 2     private final long mId;
 3
 4     public Request() {
 5         mId = newId();
 6     }
 7
 8     private static long newId() {
 9         // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
10         return INVOKE_ID.getAndIncrement();
11     }

看一下DefaultFuture的构造器:

 1     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
 2     private final long id;
 3     private final Request request;
 4     private volatile Response response;
 5
 6     public DefaultFuture(Channel channel, Request request, int timeout) {
 7         ...
 8         this.request = request;
 9         this.id = request.getId();
10         ...
11         FUTURES.put(id, this);
12         ...
13     }

再来看一下响应。

HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)

 1     Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
 2         Response res = new Response(req.getId(), req.getVersion());
 3         ...
 4         Object msg = req.getData();
 5         try {
 6             // handle data.
 7             Object result = handler.reply(channel, msg);
 8             res.setStatus(Response.OK);
 9             res.setResult(result);
10         } catch (Throwable e) {
11             res.setStatus(Response.SERVICE_ERROR);
12             res.setErrorMessage(StringUtils.toString(e));
13         }
14         return res;
15     }

来看一下Response的构造器:

1     private long mId = 0;
2
3     public Response(long id, String version) {
4         mId = id;
5         mVersion = version;
6     }

这里response的id的值时request的id。最后来看一下服务端接收后的处理:

DefaultFuture.received(Channel channel, Response response)

 1     public static void received(Channel channel, Response response) {
 2         try {
 3             DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
 4             if (future != null) {
 5                 future.doReceived(response);
 6             } else {
 7                ...
 8             }
 9         } finally {
10             CHANNELS.remove(response.getId());
11         }
12     }
时间: 2024-08-02 07:05:51

9.3 客户端接收响应信息(异步转同步的实现)的相关文章

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt Blocking API客户端订阅并接收主题信息

在上面两篇关于mqtt的broker的启动和mqtt的服务端发布主题信息之后,我们客户端需要订阅相关的信息并接收相关的主题信息. Java代码   package com.etrip.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.

java在线聊天项目0.9版 实现把服务端接收到的信息返回给每一个客户端窗口中显示功能之客户端接收

客户端要不断接收服务端发来的信息 与服务端不断接收客户端发来信息相同,使用线程的方法,在线程中循环接收 客户端修改后代码如下: package com.swift; import java.awt.BorderLayout; import java.awt.Color; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.WindowAdapter; impo

TCP连接建立系列 — 客户端接收SYNACK和发送ACK

主要内容:客户端接收SYNACK.发送ACK,完成连接的建立. 内核版本:3.15.2 我的博客:http://blog.csdn.net/zhangskd 接收入口 tcp_v4_rcv |--> tcp_v4_do_rcv |-> tcp_rcv_state_process |-> tcp_rcv_synsent_state_process 1. 状态为ESTABLISHED时,用tcp_rcv_established()接收处理. 2. 状态为LISTEN时,说明这个sock处于监

java 使用 comet4j 主动向客户端推送信息 简单例子

[背景] 今天,一个前端的师弟问我怎样做实时聊天窗口,我毫不犹豫地说:在前台定时访问服务端呀!师弟默默地百度了一番,最后告诉我,有一种技术是后服务端动推送信息给客户端的,这种技术的名字叫comet,我惊呆了,因为完全没听过,赶紧上网搜集资料,耗了一个晚上写了个简单的例子,实现主动向客户端发送信息.说是说主动,其实还是要客户端先献出它的“第一次”,即只要它有先请求你一下,以后你们熟了,你想主动约它就约它! 关于comet技术介绍及其实现原理,可以参考网站 http://www.ibm.com/de

activemq 异步和同步接收

来点实在的代码,用例子来说明: 1.异步接收,主要设置messageListener.,然后编写onmessage方法,很简单 a.客户端发送5条消息 1 package ch02.chat; 2 3 import javax.jms.JMSException; 4 5 public class ClientTest3 { 6 7 public static void main(String[] args) throws JMSException { 8 // TODO Auto-generat

pushlet实现服务器端向客户端推送信息

使用Pushlet来实现服务器端向客户端推送信息 1.   实现方式: 有两种实现方式: 1.         通过配置文件来实现定时的从服务器端向客户端推送信息 2.         通过API主动向另外一端推送信息 以下分别给予介绍. 2.   特别注意 在开始测试之前,有三点非常重要,需要实现讲明,否则程序将会无法正常运行: 2.1.     JSP页面上的设定 JSP页面上必须添加以下代码以确保Pushlet能够正确的获得后台服务的地址: <base href="<%=req

Java网络编程——服务器端和客户端互发信息

引言 ? ? 为了学习Java网络编程,用一个QQ(屌丝版)作为例子练手,记录屌丝版QQ的开发过程,这里我们认为已经掌握其中的网络基础部分,即HTTP协议,TCP/IP协议等,在此基础上我们开始我们的Java网络部分的学习,我们要知道服务器和客户端是如何通信的,首先我们要了解Java网络知识中的一个很重要的东西--Socket ? ? Socket初探 ? ? ServerSocket和Socket ? ? 首先服务器端需要用到java.net包下的ServerSocket类,该类的一个实例用于

基于TCP网络通信的自动升级程序源码分析-客户端接收文件

升级程序客户端接收文件 /// <summary> /// 文件数据缓存 索引是 ConnectionInfo对象 数据包的顺序号 值是数据 /// </summary> Dictionary<ConnectionInfo, Dictionary<long, byte[]>> incomingDataCache = new Dictionary<ConnectionInfo, Dictionary<long, byte[]>>();

实现UDP高效接收/响应

环境Linux g++6.3.0 问题一:一个ip地址如何接收高并发请求 问题二:如何高并发响应消息 发送请求端只能通过ip地址+端口号向服务器发送请求码,所以服务器只能用一个UDP去绑定此ip以及端口号.而如何完成高并发发送响应消息, 谁去发送这个响应消息,接收请求信息的UDP?这就造成其中一个任务必须等待另一个任务执行完毕,sendto是非阻塞,而recvfrom是阻塞,若 执行recvfrom碰巧没有下个请求信息或者网络阻塞造成UDP丢失,那么sendto岂是不能执行(一直等待recvfr