netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响

  进行这项实验之前,先读了xbmchina的简书文章,感谢这位大神提供的关于channelPipeline和channelHandler文章:

  【Netty】ChannelPipeline和ChannelHandler(一)

  【Netty】ChannelHandler的添加和删除(二)

  【Netty】inBound和outBound事件的传播过程

  之前想以leonzm的websocket_demo项目为基础,写netty4版本的聊天室,但是发现netty4的函数不一样,messageReceived(建立链接/接收数据包)和close(断开链接)不能覆写,研究了下handler的生命周期。知道channelRead0可以建立链接,并接收已建立链接的客户端的数据包;当隧道处于channelInactived阶段时,表明数据隧道(链接)要断开了,就要进入channelUnregistered阶段,这时就可以在上面执行链接相关数据清除工作;隧道的处理器ChannelHandler也有生命周期,handlerRemoved时也可以执行类似操作。

  netty的inbound和outbound的区别:除了inbound事件为被动触发,在某些情况发生时自动触发,outbound为主动触发,在需要主动执行某些操作时触发以外,outBound单独用不能接收到websocket客户端的信息(这是向外主动发信息的handler,接收信息要inbound来),outBound这个跟适合在pipeline流水线上嵌入,做AOP(切面编程)。

  开始执行channelPipeline流水线程序比较:

  Lanucher.java:(开启netty服务的主函数)

 1 package com.company.lanucher;
 2
 3 import com.company.server.ReversedWebSocketServer;
 4 import com.company.server.WebSocketServer;
 5
 6 public class Lanucher {
 7
 8     public static void main(String[] args) throws Exception {
 9         // 启动WebSocket,如果想开启另一个服务器,注释掉Reversed,再解除WebSocketServer的注释即可
10         //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
11         new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT);
12     }
13
14 }

Lanucher.java

  WebSocketServer.java:(流水线先执行inBoundHandler再执行OutBoundAdapter)

 1 package com.company.server;
 2
 3 import org.apache.log4j.Logger;
 4
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15
16 /**
17  * WebSocket服务
18  *
19  */
20 public class WebSocketServer {
21     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
22
23     // websocket端口
24     public static final int WEBSOCKET_PORT = 9090;
25
26     public void run(int port) throws Exception {
27         EventLoopGroup bossGroup = new NioEventLoopGroup();
28         EventLoopGroup workerGroup = new NioEventLoopGroup();
29         try {
30             ServerBootstrap b = new ServerBootstrap();
31             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
32
33                 @Override
34                 protected void initChannel(Channel channel) throws Exception {
35                     ChannelPipeline pipeline = channel.pipeline();
36                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
37                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
38                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
39                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler的前置拦截器
40                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
41                 }
42             });
43
44             Channel channel = b.bind(port).sync().channel();
45             LOG.info("WebSocket 已经启动,端口:" + port + ".");
46             channel.closeFuture().sync();
47         } finally {
48             bossGroup.shutdownGracefully();
49             workerGroup.shutdownGracefully();
50         }
51     }
52 }

WebSocketServer.java

  ReversedWebSocketServer.java:(流水线先执行OutBoundAdapter再执行inBoundHandler)

 1 package com.company.server;
 2
 3 import org.apache.log4j.Logger;
 4
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15
16 public class ReversedWebSocketServer {
17     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
18
19     // websocket端口
20     public static final int WEBSOCKET_PORT = 9090;
21     public static final int FUN_WEBSOCKET_PORT = 9091;
22
23     public void run(int port) throws Exception {
24         EventLoopGroup bossGroup = new NioEventLoopGroup();
25         EventLoopGroup workerGroup = new NioEventLoopGroup();
26         try {
27             ServerBootstrap b = new ServerBootstrap();
28             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
29
30                 @Override
31                 protected void initChannel(Channel channel) throws Exception {
32                     ChannelPipeline pipeline = channel.pipeline();
33                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
34                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
35                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
36                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
37                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler
38                 }
39             });
40
41             Channel channel = b.bind(port).sync().channel();
42             LOG.info("WebSocket 已经启动,端口:" + port + ".");
43             channel.closeFuture().sync();
44         } finally {
45             bossGroup.shutdownGracefully();
46             workerGroup.shutdownGracefully();
47         }
48     }
49
50 }

ReversedWebSocketServer.java

  BananaWebSocketServerHandler.java:(inBoundHandler,处理从客户端接收的请求)

  1 package com.company.server;
  2
  3 import io.netty.buffer.ByteBuf;
  4 import io.netty.buffer.Unpooled;
  5 import io.netty.channel.ChannelFuture;
  6 import io.netty.channel.ChannelFutureListener;
  7 import io.netty.channel.ChannelHandlerContext;
  8 import io.netty.channel.ChannelPromise;
  9 import io.netty.channel.SimpleChannelInboundHandler;
 10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 11 import io.netty.handler.codec.http.FullHttpRequest;
 12 import io.netty.handler.codec.http.FullHttpResponse;
 13 import io.netty.handler.codec.http.HttpHeaders;
 14 import io.netty.handler.codec.http.HttpResponseStatus;
 15 import io.netty.handler.codec.http.HttpVersion;
 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 23 import io.netty.util.CharsetUtil;
 24
 25 import org.apache.log4j.Logger;
 26
 27 import com.company.serviceimpl.BananaService;
 28 import com.company.util.BufToString;
 29 import com.company.util.CODE;
 30 import com.company.util.Request;
 31 import com.company.util.Response;
 32 import com.google.common.base.Strings;
 33 import com.google.gson.JsonSyntaxException;
 34
 35
 36 /**
 37  * WebSocket服务端Handler
 38  *
 39  */
 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
 41     private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
 42
 43     private WebSocketServerHandshaker handshaker;
 44     private ChannelHandlerContext ctx;
 45     private String sessionId;
 46     private boolean isLog = true;
 47
 48     public BananaWebSocketServerHandler() {
 49         super();
 50     }
 51
 52     public BananaWebSocketServerHandler(boolean isLog) {
 53         this();
 54         this.isLog = isLog;
 55     }
 56
 57     //netty 5的覆写函数,netty4中用channelRead0代替
 58     public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
 59         if(this.isLog) {
 60             System.out.print("channel MessageReceived = = " + ctx.name());
 61         }
 62         if (msg instanceof FullHttpRequest) { // 传统的HTTP接入
 63             FullHttpRequest mymsg = (FullHttpRequest) msg;
 64             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 65             handleHttpRequest(ctx, mymsg);
 66         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
 67             WebSocketFrame mymsg = (WebSocketFrame) msg;
 68             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 69             handleWebSocketFrame(ctx, mymsg);
 70         }
 71     }
 72
 73     @Override
 74     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
 75         System.out.println("channel handlerAdded = = " + ctx.name());
 76     }
 77
 78     @Override
 79     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
 80         System.out.println("channel handlerRemoved = = " + ctx.name());
 81     }
 82
 83     @Override
 84     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 85         if(this.isLog) {
 86             System.out.print("channel Read0 = = " + ctx.name());
 87         }
 88         if (msg instanceof FullHttpRequest) { // 传统的HTTP接入
 89             FullHttpRequest mymsg = (FullHttpRequest) msg;
 90             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 91             handleHttpRequest(ctx, mymsg);
 92         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
 93             WebSocketFrame mymsg = (WebSocketFrame) msg;
 94             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 95             handleWebSocketFrame(ctx, mymsg);
 96         }
 97     }
 98
 99     @Override
100     public void channelInactive(ChannelHandlerContext ctx) {
101         if(this.isLog) {
102             System.out.println("channel Inactive = = " + ctx.name());
103         }
104         try {
105             this.close(ctx, null);
106         } catch (Exception e) {
107             e.printStackTrace();
108         }
109     }
110
111     @Override
112     public void channelUnregistered(ChannelHandlerContext ctx) {
113         if(this.isLog) {
114             System.out.println("channel Unregistered = = " + ctx.name());
115         }
116     }
117
118     @Override
119     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
120         ctx.flush();
121         System.out.println("channel Flush = = " + ctx.name());
122     }
123
124     @Override
125     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
126
127         ctx.close();
128         if(this.isLog) {
129             System.err.println("channel exceptionCaught = = " + ctx.name());
130             cause.printStackTrace();
131         }
132         BananaService.logout(sessionId); // 注销
133         BananaService.notifyDownline(sessionId); // 通知有人下线
134     }
135
136     //netty 5的覆写函数,netty4中用channelInactive代替
137     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
138         if(this.isLog) {
139             System.out.println("channel close = = " + ctx.name());
140         }
141         BananaService.logout(sessionId); // 注销
142         BananaService.notifyDownline(sessionId); // 通知有人下线
143         ctx.close();
144     }
145
146     /**
147      * 处理Http请求,完成WebSocket握手<br/>
148      * 注意:WebSocket连接第一次请求使用的是Http
149      * @param ctx
150      * @param request
151      * @throws Exception
152      */
153     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
154         // 如果HTTP解码失败,返回HHTP异常
155         if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
156             sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
157             return;
158         }
159
160         // 正常WebSocket的Http连接请求,构造握手响应返回
161         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
162         handshaker = wsFactory.newHandshaker(request);
163         if (handshaker == null) { // 无法处理的websocket版本
164             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
165         } else { // 向客户端发送websocket握手,完成握手
166             handshaker.handshake(ctx.channel(), request);
167             // 记录管道处理上下文,便于服务器推送数据到客户端
168             this.ctx = ctx;
169         }
170     }
171
172     /**
173      * 处理Socket请求
174      * @param ctx
175      * @param frame
176      * @throws Exception
177      */
178     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
179         // 判断是否是关闭链路的指令
180         if (frame instanceof CloseWebSocketFrame) {
181             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
182             return;
183         }
184         // 判断是否是Ping消息
185         if (frame instanceof PingWebSocketFrame) {
186             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
187             return;
188         }
189         // 当前只支持文本消息,不支持二进制消息
190         if (!(frame instanceof TextWebSocketFrame)) {
191             throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
192         }
193
194         // 处理来自客户端的WebSocket请求
195         try {
196             /*
197             if(this.isLog) {
198                 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text());
199             }
200             */
201             Request request = Request.create(((TextWebSocketFrame)frame).text());
202             Response response = new Response();
203             response.setServiceId(request.getServiceId());
204             if (CODE.online.code.intValue() == request.getServiceId()) { // 客户端注册
205                 String requestId = request.getRequestId();
206                 if (Strings.isNullOrEmpty(requestId)) {
207                     response.setIsSucc(false).setMessage("requestId不能为空");
208                     return;
209                 } else if (Strings.isNullOrEmpty(request.getName())) {
210                     response.setIsSucc(false).setMessage("name不能为空");
211                     return;
212                 } else if (BananaService.bananaWatchMap.containsKey(requestId)) {
213                     response.setIsSucc(false).setMessage("您已经注册了,不能重复注册");
214                     return;
215                 }
216                 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
217                     response.setIsSucc(false).setMessage("注册失败");
218                 } else {
219                     response.setIsSucc(true).setMessage("注册成功");
220
221                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
222                         response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 将已经上线的人员返回
223
224                         if (!reqId.equals(requestId)) {
225                             Request serviceRequest = new Request();
226                             serviceRequest.setServiceId(CODE.online.code);
227                             serviceRequest.setRequestId(requestId);
228                             serviceRequest.setName(request.getName());
229                             try {
230                                 callBack.send(serviceRequest); // 通知有人上线
231                             } catch (Exception e) {
232                                 LOG.warn("回调发送消息给客户端异常", e);
233                             }
234                         }
235                     });
236                 }
237                 sendWebSocket(response.toJson());
238                 this.sessionId = requestId; // 记录会话id,当页面刷新或浏览器关闭时,注销掉此链路
239             } else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客户端发送消息到聊天群
240                 String requestId = request.getRequestId();
241                 if (Strings.isNullOrEmpty(requestId)) {
242                     response.setIsSucc(false).setMessage("requestId不能为空");
243                 } else if (Strings.isNullOrEmpty(request.getName())) {
244                     response.setIsSucc(false).setMessage("name不能为空");
245                 } else if (Strings.isNullOrEmpty(request.getMessage())) {
246                     response.setIsSucc(false).setMessage("message不能为空");
247                 } else {
248                     response.setIsSucc(true).setMessage("发送消息成功");
249
250                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 将消息发送到所有机器
251                         Request serviceRequest = new Request();
252                         serviceRequest.setServiceId(CODE.receive_message.code);
253                         serviceRequest.setRequestId(requestId);
254                         serviceRequest.setName(request.getName());
255                         serviceRequest.setMessage(request.getMessage());
256                         try {
257                             callBack.send(serviceRequest);
258                         } catch (Exception e) {
259                             LOG.warn("回调发送消息给客户端异常", e);
260                         }
261                     });
262                 }
263                 sendWebSocket(response.toJson());
264             } else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客户端下线
265                 String requestId = request.getRequestId();
266                 if (Strings.isNullOrEmpty(requestId)) {
267                     sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson());
268                 } else {
269                     BananaService.logout(requestId);
270                     response.setIsSucc(true).setMessage("下线成功");
271
272                     BananaService.notifyDownline(requestId); // 通知有人下线
273
274                     sendWebSocket(response.toJson());
275                 }
276
277             } else {
278                 sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson());
279             }
280         } catch (JsonSyntaxException e1) {
281             LOG.warn("Json解析异常", e1);
282             System.err.println("Json解析异常");
283             e1.printStackTrace();
284         } catch (Exception e2) {
285             LOG.error("处理Socket请求异常", e2);
286             System.err.println("处理Socket请求异常");
287             e2.printStackTrace();
288         }
289     }
290
291     /**
292      * Http返回
293      * @param ctx
294      * @param request
295      * @param response
296      */
297     private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
298         // 返回应答给客户端
299         if (response.getStatus().code() != 200) {
300             ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
301             response.content().writeBytes(buf);
302             buf.release();
303             HttpHeaders.setContentLength(response, response.content().readableBytes());
304         }
305
306         // 如果是非Keep-Alive,关闭连接
307         ChannelFuture f = ctx.channel().writeAndFlush(response);
308         if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
309             f.addListener(ChannelFutureListener.CLOSE);
310         }
311     }
312
313     /**
314      * WebSocket返回
315      * @param ctx
316      * @param req
317      * @param res
318      */
319     public void sendWebSocket(String msg) throws Exception {
320         if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
321             throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
322         }
323         this.ctx.channel().write(new TextWebSocketFrame(msg));
324         this.ctx.flush();
325     }
326
327 }

BananaWebSocketServerHandler.java

  FunWebSocketServerHandler.java:(outBoundAdapter,处理从服务器发出的响应)

 1 package com.company.server;
 2
 3 import java.net.SocketAddress;
 4
 5 import com.company.util.BufToString;
 6
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelOutboundHandlerAdapter;
 9 import io.netty.channel.ChannelPromise;
10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
13
14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter{
15
16     @Override
17     public void read(ChannelHandlerContext ctx) throws Exception {
18         ChannelHandlerContext readRes = ctx.read();
19         System.out.println(ctx.name() + " is read in " + readRes.toString());
20     }
21
22     @Override
23     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
24         System.out.println(ctx.name() + " handlerAdded = = " + ctx.name());
25     }
26
27     @Override
28     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
29         System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name());
30     }
31
32     @Override
33     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
34             ChannelPromise promise) throws Exception {
35         ctx.bind(localAddress, promise);
36         System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString());
37     }
38     @Override
39     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
40             SocketAddress localAddress, ChannelPromise promise) throws Exception {
41         ctx.connect(remoteAddress, localAddress, promise);
42         System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client "  + remoteAddress.toString() + " in " + promise.toString());
43     }
44     @Override
45     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
46             throws Exception {
47         ctx.disconnect(promise);
48         System.out.println(ctx.name() + " is disconnect in " + promise.toString());
49     }
50     @Override
51     public void close(ChannelHandlerContext ctx, ChannelPromise promise)
52             throws Exception {
53         ctx.close(promise);
54         System.out.println(ctx.name() + " is close in " + promise.toString());
55     }
56     @Override
57     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
58         ctx.deregister(promise);
59         System.out.println(ctx.name() + " is deregister in " + promise.toString());
60     }
61
62     @Override
63     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
64         ctx.write(msg, promise);
65         System.out.print(ctx.name() + " is write in " + promise.toString());
66         if(msg instanceof DefaultFullHttpResponse) {
67             System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content()));
68         }
69         else if(msg instanceof TextWebSocketFrame) {
70             System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text());
71         }
72         else if(msg instanceof CloseWebSocketFrame) {
73             System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText());
74         }
75         else {
76             System.out.println(" with message : " + msg.getClass());
77         }
78     }
79     @Override
80     public void flush(ChannelHandlerContext ctx) throws Exception {
81         ctx.flush();
82         System.out.println(ctx.name() + " is flush");
83     }
84 }

FunWebSocketServerHandler.java

  banana.html:(聊天室前端)

  1 <!DOCTYPE html>
  2 <html>
  3 <head>
  4 <meta charset="UTF-8">
  5 <title>Netty WebSocket 聊天实例</title>
  6 </head>
  7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script>
  8 <script src="map.js" type="text/javascript"></script>
  9 <script type="text/javascript">
 10 $(document).ready(function() {
 11     var uuid = guid(); // uuid在一个会话唯一
 12     var nameOnline = ‘‘; // 上线姓名
 13     var onlineName = new Map(); // 已上线人员, <requestId, name>
 14
 15     $("#name").attr("disabled","disabled");
 16     $("#onlineBtn").attr("disabled","disabled");
 17     $("#downlineBtn").attr("disabled","disabled");
 18
 19     $("#banana").hide();
 20
 21     // 初始化websocket
 22     var socket;
 23     if (!window.WebSocket) {
 24         window.WebSocket = window.MozWebSocket;
 25     }
 26     if (window.WebSocket) {
 27         socket = new WebSocket("ws://localhost:9090/");
 28         socket.onmessage = function(event) {
 29             console.log("收到服务器消息:" + event.data);
 30             if (event.data.indexOf("isSucc") != -1) {// 这里需要判断是客户端请求服务端返回后的消息(response)
 31                 var response = JSON.parse(event.data);
 32                 if (response != undefined && response != null) {
 33                     if (response.serviceId == 1001) { // 上线
 34                         if (response.isSucc) {
 35                             // 上线成功,初始化已上线人员
 36                             onlineName.clear();
 37                             $("#showOnlineNames").empty();
 38                             for (var reqId in response.hadOnline) {
 39                                 onlineName.put(reqId, response.hadOnline[reqId]);
 40                             }
 41                             initOnline();
 42
 43                             $("#name").attr("disabled","disabled");
 44                             $("#onlineBtn").attr("disabled","disabled");
 45                             $("#downlineBtn").removeAttr("disabled");
 46                             $("#banana").show();
 47                         } else {
 48                             alert("上线失败");
 49                         }
 50                     } else if (response.serviceId == 1004) {
 51                         if (response.isSucc) {
 52                             onlineName.clear();
 53                             $("#showBanana").empty();
 54                             $("#showOnlineNames").empty();
 55                             $("#name").removeAttr("disabled");
 56                             $("#onlineBtn").removeAttr("disabled");
 57                             $("#downlineBtn").attr("disabled","disabled");
 58                             $("#banana").hide();
 59                         } else {
 60                             alert("下线失败");
 61                         }
 62                     }
 63                 }
 64             } else {// 还是服务端向客户端的请求(request)
 65                 var request = JSON.parse(event.data);
 66                 if (request != undefined && request != null) {
 67                     if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上线/下线
 68                         if (request.serviceId == 1001) {
 69                             onlineName.put(request.requestId, request.name);
 70                         }
 71                         if (request.serviceId == 1004) {
 72                             onlineName.removeByKey(request.requestId);
 73                         }
 74
 75                         initOnline();
 76                     } else if (request.serviceId == 1003) { // 有人发消息
 77                         appendBanana(request.name, request.message);
 78                     }
 79                 }
 80             }
 81         };
 82         socket.onopen = function(event) {
 83             $("#name").removeAttr("disabled");
 84             $("#onlineBtn").removeAttr("disabled");
 85             console.log("已连接服务器");
 86         };
 87         socket.onclose = function(event) { // WebSocket 关闭
 88             console.log("WebSocket已经关闭!");
 89         };
 90         socket.onerror = function(event) {
 91             console.log("WebSocket异常!");
 92         };
 93     } else {
 94         alert("抱歉,您的浏览器不支持WebSocket协议!");
 95     }
 96
 97     // WebSocket发送请求
 98     function send(message) {
 99         if (!window.WebSocket) { return; }
100         if (socket.readyState == WebSocket.OPEN) {
101             socket.send(message);
102         } else {
103             console.log("WebSocket连接没有建立成功!");
104             alert("您还未连接上服务器,请刷新页面重试");
105         }
106     }
107
108     // 刷新上线人员
109     function initOnline() {
110         $("#showOnlineNames").empty();
111         for (var i=0;i<onlineName.size();i++) {
112             $("#showOnlineNames").append(‘<tr><td>‘ + (i+1) + ‘</td>‘ +
113             ‘<td>‘ + onlineName.element(i).value + ‘</td>‘ +
114             ‘</tr>‘);
115         }
116     }
117     // 追加聊天信息
118     function appendBanana(name, message) {
119         $("#showBanana").append(‘<tr><td>‘ + name + ‘: ‘ + message + ‘</td></tr>‘);
120     }
121
122     $("#onlineBtn").bind("click", function() {
123         var name = $("#name").val();
124         if (name == null || name == ‘‘) {
125             alert("请输入您的尊姓大名");
126             return;
127         }
128
129         nameOnline = name;
130         // 上线
131         send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name}));
132     });
133
134     $("#downlineBtn").bind("click", function() {
135         // 下线
136         send(JSON.stringify({"requestId":uuid, "serviceId":1004}));
137     });
138
139     $("#sendBtn").bind("click", function() {
140         var message = $("#messageInput").val();
141         if (message == null || message == ‘‘) {
142             alert("请输入您的聊天信息");
143             return;
144         }
145
146         // 发送聊天消息
147         send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message}));
148         $("#messageInput").val("");
149     });
150
151 });
152
153 function guid() {
154     function S4() {
155        return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
156     }
157     return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
158 }
159 </script>
160 <body>
161   <h1>Netty WebSocket 聊天实例</h1>
162   <input type="text" id="name" value="佚名" placeholder="姓名" />
163   <input type="button" id="onlineBtn" value="上线" />
164   <input type="button" id="downlineBtn" value="下线" />
165   <hr/>
166   <table id="banana" border="1" >
167     <tr>
168       <td width="600" align="center">聊天</td>
169       <td width="100" align="center">上线人员</td>
170     </tr>
171     <tr height="200" valign="top">
172       <td>
173         <table id="showBanana" border="0" width="600">
174             <!--
175             <tr>
176               <td>张三: 大家好</td>
177             </tr>
178             <tr>
179               <td>李四: 欢迎加入群聊</td>
180             </tr>
181             -->
182         </table>
183       </td>
184       <td>
185         <table id="showOnlineNames" border="0">
186             <!--
187             <tr>
188               <td>1</td>
189               <td>张三</td>
190             <tr/>
191             <tr>
192               <td>2</td>
193               <td>李四</td>
194             <tr/>
195             -->
196         </table>
197       </td>
198     </tr>
199     <tr height="40">
200       <td></td>
201       <td></td>
202     </tr>
203     <tr>
204       <td>
205         <input type="text" id="messageInput"  style="width:590px" placeholder="巴拉巴拉点什么吧" />
206       </td>
207       <td>
208         <input type="button" id="sendBtn" value="发送" />
209       </td>
210     </tr>
211   </table>
212
213 </body>
214 </html>

banana.html

  分别运行WebSocketServer和ReservedWebSocketServer,运行日志如下:

============先adapter后handler==============
============连接开始========================

adapter handlerAdded = = adapter
channel handlerAdded = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
channel Read0 = = handler with http request :
adapter is write in [email protected](incomplete) with message :
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============上线用户========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"}
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============发送信息========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in [email protected](incomplete) with socket message : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"}
adapter is flush
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============下线用户========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004}
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============用户断线========================

channel Read0 = = handler with socket request : ?
adapter is write in [email protected](incomplete) close reason :
adapter is flush
adapter is close in [email protected](success)
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245])
channel Inactive = = handler
channel close = = handler
adapter is close in [email protected](success)
channel Unregistered = = handler
channel handlerRemoved = = handler
adapter handlerRemoved = = adapter

WebSocketServer运行结果

  以及

============先adapter后handler==============
============连接开始========================

channel handlerAdded = = handler
adapter handlerAdded = = adapter
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
channel Read0 = = handler with http request :
adapter is write in [email protected](incomplete) with message :
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============上线用户========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"}
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============发送信息========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in [email protected](incomplete) with socket message : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"}
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============下线用户========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004}
adapter is write in [email protected](incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============用户断线========================

channel Read0 = = handler with socket request : ?
adapter is write in [email protected](incomplete) close reason :
adapter is flush
adapter is close in [email protected](success)
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671])
channel Inactive = = handler
channel close = = handler
channel Unregistered = = handler
adapter handlerRemoved = = adapter
channel handlerRemoved = = handler

ReversedWebSocketServer运行结果

  除了运行顺序不同,outBoundAdapter的flush操作也多了几次,尤其在发送这一块,因为不仅要接收数据包,还要发送数据包,要多刷新adapter。

  由此可见,netty的pipeline一定要仔细规划,能先让服务器处理就先让服务器处理,把outbound拦截器放在inbound拦截器前面。

原文地址:https://www.cnblogs.com/dgutfly/p/11536116.html

时间: 2024-10-08 11:37:16

netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响的相关文章

Netty 的 inbound 与 outbound, 以及 InboundHandler 的 channelInactive 与 OutboundHandler 的 close

先看一个例子. 有一个简单 Server public class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootst

pcie inbound、outbound及EP、RC间的互相访问

Inbound:PCI域访问存储器域 Outbound:存储器域访问PCI域 RC访问EP: RC存储器域->outbound->RC PCI域->EP PCI域->inbound->EP存储器域 EP访问RC:EP存储器域->outbound->EP PCI域->RC PCI域->inbound->RC存储器域 Out即出去,发起访问的一侧,需要进行outbound,去访问对端 In即进来,被访问的一侧,需要进行inbound,使得对端可以访问

cacti下交换机端口图像中Inbound与Outbound的理解

具体图像如下: 端口21下连接的终端IP为10.240.240.69. 机器从外部下载文件 此时,对应的21端口方向为outbound,即外界数据包(所下载的软件)经由该端口(从21端口流出)传给该机器 该机器在14:12分左右使用迅雷下载(图中表现为端口21下的Outbound流量在该时段瞬间飙升),14:18分钟后限制了下载速度(图中表现为端口21下的Outbound流量在该时段陡减),之后限制下载速度3333KB/s(图中表现为端口21下的Outbound流量在该时段瞬间飙升,之后保持水平

市场营销方式详尽客户调查报告:Inbound vs. Outbound

天地会珠海分舵住:本文来自Moz,中文版由天地会珠海分舵编译.inbound/outbound marketing,国内还没有让天地会珠海分舵觉得很好的统一的译法,这里引用百度百科的解析,"Inbound Marketing,又叫集客营销, 是让顾客自己找上门的营销策略,是一种'关系营销'或是'许可营销',营销者以自己的力量挣得顾客的青睐,而非传统广告方式去拉顾客(outbound marketing)." 文中会将inbound/outbound作为术语直接保留. 据说消费者长期以来

SAP EDI Architecture (Inbound and Outbound process)

The SAP EDI process comprises two distinct processes. Outbound process Inbound process The Outbound Process The outbound process sends documents from the SAP system to a business partner (vendor, customer, and bank). The outbound process consists of

Netty:ChannelPipeline

ChannelPipeline是一个Handler的集合,它负责处理和拦截inbound或者outbound的事件和操作.他是通过 Intercepting Filter的模式,让用户可以控制Channe各种操作之间的交互.Channel的bind,connect,close等都是通过pipeline进行操作的,摘几段AbstractChannel的代码便能看得出: @Override public ChannelFuture connect(SocketAddress remoteAddres

责任链模式的使用-Netty ChannelPipeline和Mina IoFilterChain分析

本文来自网易云社区 作者:乔安然 1. Chain of Responsiblity 定义: 使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系.将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止. 结构实图: 2. Netty ChannelPipeline 分析 Netty的ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器,这类过滤器其实就是责任链模式的一种变形,方便事件的拦截和用户业务逻辑的定制

Pipeline inbound(netty源码7)

netty源码死磕7  Pipeline 入站流程详解 1. Pipeline的入站流程 在讲解入站处理流程前,先脑补和铺垫一下两个知识点: (1)如何向Pipeline添加一个Handler节点 (2)Handler的出站和入站的区分方式 1.1. HandlerContext节点的添加 在Pipeline实例创建的同时,Netty为Pipeline创建了一个Head和一个Tail,并且建立好了链接关系. 代码如下: protected DefaultChannelPipeline(Chann

netty源码解解析(4.0)-8 ChannelPipeline的设计

io.netty.channel.ChannelPipeline 设计原理 上图中,为了更直观地展示事件处理顺序, 故意有规律地放置两种handler的顺序,实际上ChannelInboundHandler和ChanneOutboundHandler的顺序可以是任意,取决于用户调用add方法把handler方在哪里. ChannelPipeline的特性: 1. 它是一个双向链表 2. 每个节点持有一个ChannelHandler实例,这个实例可以是ChannelInboundHandler类型