进行这项实验之前,先读了xbmchina的简书文章,感谢这位大神提供的关于channelPipeline和channelHandler文章:
【Netty】ChannelPipeline和ChannelHandler(一)
【Netty】ChannelHandler的添加和删除(二)
【Netty】inBound和outBound事件的传播过程
之前想以leonzm的websocket_demo项目为基础,写netty4版本的聊天室,但是发现netty4的函数不一样,messageReceived(建立链接/接收数据包)和close(断开链接)不能覆写,研究了下handler的生命周期。知道channelRead0可以建立链接,并接收已建立链接的客户端的数据包;当隧道处于channelInactived阶段时,表明数据隧道(链接)要断开了,就要进入channelUnregistered阶段,这时就可以在上面执行链接相关数据清除工作;隧道的处理器ChannelHandler也有生命周期,handlerRemoved时也可以执行类似操作。
netty的inbound和outbound的区别:除了inbound事件为被动触发,在某些情况发生时自动触发,outbound为主动触发,在需要主动执行某些操作时触发以外,outBound单独用不能接收到websocket客户端的信息(这是向外主动发信息的handler,接收信息要inbound来),outBound这个跟适合在pipeline流水线上嵌入,做AOP(切面编程)。
开始执行channelPipeline流水线程序比较:
Lanucher.java:(开启netty服务的主函数)
1 package com.company.lanucher; 2 3 import com.company.server.ReversedWebSocketServer; 4 import com.company.server.WebSocketServer; 5 6 public class Lanucher { 7 8 public static void main(String[] args) throws Exception { 9 // 启动WebSocket,如果想开启另一个服务器,注释掉Reversed,再解除WebSocketServer的注释即可 10 //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT); 11 new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT); 12 } 13 14 }
Lanucher.java
WebSocketServer.java:(流水线先执行inBoundHandler再执行OutBoundAdapter)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 /** 17 * WebSocket服务 18 * 19 */ 20 public class WebSocketServer { 21 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 22 23 // websocket端口 24 public static final int WEBSOCKET_PORT = 9090; 25 26 public void run(int port) throws Exception { 27 EventLoopGroup bossGroup = new NioEventLoopGroup(); 28 EventLoopGroup workerGroup = new NioEventLoopGroup(); 29 try { 30 ServerBootstrap b = new ServerBootstrap(); 31 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() { 32 33 @Override 34 protected void initChannel(Channel channel) throws Exception { 35 ChannelPipeline pipeline = channel.pipeline(); 36 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码 37 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装 38 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 39 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler的前置拦截器 40 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler 41 } 42 }); 43 44 Channel channel = b.bind(port).sync().channel(); 45 LOG.info("WebSocket 已经启动,端口:" + port + "."); 46 channel.closeFuture().sync(); 47 } finally { 48 bossGroup.shutdownGracefully(); 49 workerGroup.shutdownGracefully(); 50 } 51 } 52 }
WebSocketServer.java
ReversedWebSocketServer.java:(流水线先执行OutBoundAdapter再执行inBoundHandler)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 public class ReversedWebSocketServer { 17 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 18 19 // websocket端口 20 public static final int WEBSOCKET_PORT = 9090; 21 public static final int FUN_WEBSOCKET_PORT = 9091; 22 23 public void run(int port) throws Exception { 24 EventLoopGroup bossGroup = new NioEventLoopGroup(); 25 EventLoopGroup workerGroup = new NioEventLoopGroup(); 26 try { 27 ServerBootstrap b = new ServerBootstrap(); 28 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() { 29 30 @Override 31 protected void initChannel(Channel channel) throws Exception { 32 ChannelPipeline pipeline = channel.pipeline(); 33 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码 34 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装 35 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 36 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler 37 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler 38 } 39 }); 40 41 Channel channel = b.bind(port).sync().channel(); 42 LOG.info("WebSocket 已经启动,端口:" + port + "."); 43 channel.closeFuture().sync(); 44 } finally { 45 bossGroup.shutdownGracefully(); 46 workerGroup.shutdownGracefully(); 47 } 48 } 49 50 }
ReversedWebSocketServer.java
BananaWebSocketServerHandler.java:(inBoundHandler,处理从客户端接收的请求)
1 package com.company.server; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelFutureListener; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelPromise; 9 import io.netty.channel.SimpleChannelInboundHandler; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.FullHttpRequest; 12 import io.netty.handler.codec.http.FullHttpResponse; 13 import io.netty.handler.codec.http.HttpHeaders; 14 import io.netty.handler.codec.http.HttpResponseStatus; 15 import io.netty.handler.codec.http.HttpVersion; 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame; 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; 23 import io.netty.util.CharsetUtil; 24 25 import org.apache.log4j.Logger; 26 27 import com.company.serviceimpl.BananaService; 28 import com.company.util.BufToString; 29 import com.company.util.CODE; 30 import com.company.util.Request; 31 import com.company.util.Response; 32 import com.google.common.base.Strings; 33 import com.google.gson.JsonSyntaxException; 34 35 36 /** 37 * WebSocket服务端Handler 38 * 39 */ 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> { 41 private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName()); 42 43 private WebSocketServerHandshaker handshaker; 44 private ChannelHandlerContext ctx; 45 private String sessionId; 46 private boolean isLog = true; 47 48 public BananaWebSocketServerHandler() { 49 super(); 50 } 51 52 public BananaWebSocketServerHandler(boolean isLog) { 53 this(); 54 this.isLog = isLog; 55 } 56 57 //netty 5的覆写函数,netty4中用channelRead0代替 58 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { 59 if(this.isLog) { 60 System.out.print("channel MessageReceived = = " + ctx.name()); 61 } 62 if (msg instanceof FullHttpRequest) { // 传统的HTTP接入 63 FullHttpRequest mymsg = (FullHttpRequest) msg; 64 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 65 handleHttpRequest(ctx, mymsg); 66 } else if (msg instanceof WebSocketFrame) { // WebSocket接入 67 WebSocketFrame mymsg = (WebSocketFrame) msg; 68 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 69 handleWebSocketFrame(ctx, mymsg); 70 } 71 } 72 73 @Override 74 public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ 75 System.out.println("channel handlerAdded = = " + ctx.name()); 76 } 77 78 @Override 79 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ 80 System.out.println("channel handlerRemoved = = " + ctx.name()); 81 } 82 83 @Override 84 protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 85 if(this.isLog) { 86 System.out.print("channel Read0 = = " + ctx.name()); 87 } 88 if (msg instanceof FullHttpRequest) { // 传统的HTTP接入 89 FullHttpRequest mymsg = (FullHttpRequest) msg; 90 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 91 handleHttpRequest(ctx, mymsg); 92 } else if (msg instanceof WebSocketFrame) { // WebSocket接入 93 WebSocketFrame mymsg = (WebSocketFrame) msg; 94 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 95 handleWebSocketFrame(ctx, mymsg); 96 } 97 } 98 99 @Override 100 public void channelInactive(ChannelHandlerContext ctx) { 101 if(this.isLog) { 102 System.out.println("channel Inactive = = " + ctx.name()); 103 } 104 try { 105 this.close(ctx, null); 106 } catch (Exception e) { 107 e.printStackTrace(); 108 } 109 } 110 111 @Override 112 public void channelUnregistered(ChannelHandlerContext ctx) { 113 if(this.isLog) { 114 System.out.println("channel Unregistered = = " + ctx.name()); 115 } 116 } 117 118 @Override 119 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 120 ctx.flush(); 121 System.out.println("channel Flush = = " + ctx.name()); 122 } 123 124 @Override 125 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 126 127 ctx.close(); 128 if(this.isLog) { 129 System.err.println("channel exceptionCaught = = " + ctx.name()); 130 cause.printStackTrace(); 131 } 132 BananaService.logout(sessionId); // 注销 133 BananaService.notifyDownline(sessionId); // 通知有人下线 134 } 135 136 //netty 5的覆写函数,netty4中用channelInactive代替 137 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 138 if(this.isLog) { 139 System.out.println("channel close = = " + ctx.name()); 140 } 141 BananaService.logout(sessionId); // 注销 142 BananaService.notifyDownline(sessionId); // 通知有人下线 143 ctx.close(); 144 } 145 146 /** 147 * 处理Http请求,完成WebSocket握手<br/> 148 * 注意:WebSocket连接第一次请求使用的是Http 149 * @param ctx 150 * @param request 151 * @throws Exception 152 */ 153 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { 154 // 如果HTTP解码失败,返回HHTP异常 155 if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) { 156 sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); 157 return; 158 } 159 160 // 正常WebSocket的Http连接请求,构造握手响应返回 161 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false); 162 handshaker = wsFactory.newHandshaker(request); 163 if (handshaker == null) { // 无法处理的websocket版本 164 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); 165 } else { // 向客户端发送websocket握手,完成握手 166 handshaker.handshake(ctx.channel(), request); 167 // 记录管道处理上下文,便于服务器推送数据到客户端 168 this.ctx = ctx; 169 } 170 } 171 172 /** 173 * 处理Socket请求 174 * @param ctx 175 * @param frame 176 * @throws Exception 177 */ 178 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { 179 // 判断是否是关闭链路的指令 180 if (frame instanceof CloseWebSocketFrame) { 181 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 182 return; 183 } 184 // 判断是否是Ping消息 185 if (frame instanceof PingWebSocketFrame) { 186 ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); 187 return; 188 } 189 // 当前只支持文本消息,不支持二进制消息 190 if (!(frame instanceof TextWebSocketFrame)) { 191 throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息"); 192 } 193 194 // 处理来自客户端的WebSocket请求 195 try { 196 /* 197 if(this.isLog) { 198 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text()); 199 } 200 */ 201 Request request = Request.create(((TextWebSocketFrame)frame).text()); 202 Response response = new Response(); 203 response.setServiceId(request.getServiceId()); 204 if (CODE.online.code.intValue() == request.getServiceId()) { // 客户端注册 205 String requestId = request.getRequestId(); 206 if (Strings.isNullOrEmpty(requestId)) { 207 response.setIsSucc(false).setMessage("requestId不能为空"); 208 return; 209 } else if (Strings.isNullOrEmpty(request.getName())) { 210 response.setIsSucc(false).setMessage("name不能为空"); 211 return; 212 } else if (BananaService.bananaWatchMap.containsKey(requestId)) { 213 response.setIsSucc(false).setMessage("您已经注册了,不能重复注册"); 214 return; 215 } 216 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) { 217 response.setIsSucc(false).setMessage("注册失败"); 218 } else { 219 response.setIsSucc(true).setMessage("注册成功"); 220 221 BananaService.bananaWatchMap.forEach((reqId, callBack) -> { 222 response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 将已经上线的人员返回 223 224 if (!reqId.equals(requestId)) { 225 Request serviceRequest = new Request(); 226 serviceRequest.setServiceId(CODE.online.code); 227 serviceRequest.setRequestId(requestId); 228 serviceRequest.setName(request.getName()); 229 try { 230 callBack.send(serviceRequest); // 通知有人上线 231 } catch (Exception e) { 232 LOG.warn("回调发送消息给客户端异常", e); 233 } 234 } 235 }); 236 } 237 sendWebSocket(response.toJson()); 238 this.sessionId = requestId; // 记录会话id,当页面刷新或浏览器关闭时,注销掉此链路 239 } else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客户端发送消息到聊天群 240 String requestId = request.getRequestId(); 241 if (Strings.isNullOrEmpty(requestId)) { 242 response.setIsSucc(false).setMessage("requestId不能为空"); 243 } else if (Strings.isNullOrEmpty(request.getName())) { 244 response.setIsSucc(false).setMessage("name不能为空"); 245 } else if (Strings.isNullOrEmpty(request.getMessage())) { 246 response.setIsSucc(false).setMessage("message不能为空"); 247 } else { 248 response.setIsSucc(true).setMessage("发送消息成功"); 249 250 BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 将消息发送到所有机器 251 Request serviceRequest = new Request(); 252 serviceRequest.setServiceId(CODE.receive_message.code); 253 serviceRequest.setRequestId(requestId); 254 serviceRequest.setName(request.getName()); 255 serviceRequest.setMessage(request.getMessage()); 256 try { 257 callBack.send(serviceRequest); 258 } catch (Exception e) { 259 LOG.warn("回调发送消息给客户端异常", e); 260 } 261 }); 262 } 263 sendWebSocket(response.toJson()); 264 } else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客户端下线 265 String requestId = request.getRequestId(); 266 if (Strings.isNullOrEmpty(requestId)) { 267 sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson()); 268 } else { 269 BananaService.logout(requestId); 270 response.setIsSucc(true).setMessage("下线成功"); 271 272 BananaService.notifyDownline(requestId); // 通知有人下线 273 274 sendWebSocket(response.toJson()); 275 } 276 277 } else { 278 sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson()); 279 } 280 } catch (JsonSyntaxException e1) { 281 LOG.warn("Json解析异常", e1); 282 System.err.println("Json解析异常"); 283 e1.printStackTrace(); 284 } catch (Exception e2) { 285 LOG.error("处理Socket请求异常", e2); 286 System.err.println("处理Socket请求异常"); 287 e2.printStackTrace(); 288 } 289 } 290 291 /** 292 * Http返回 293 * @param ctx 294 * @param request 295 * @param response 296 */ 297 private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) { 298 // 返回应答给客户端 299 if (response.getStatus().code() != 200) { 300 ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8); 301 response.content().writeBytes(buf); 302 buf.release(); 303 HttpHeaders.setContentLength(response, response.content().readableBytes()); 304 } 305 306 // 如果是非Keep-Alive,关闭连接 307 ChannelFuture f = ctx.channel().writeAndFlush(response); 308 if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) { 309 f.addListener(ChannelFutureListener.CLOSE); 310 } 311 } 312 313 /** 314 * WebSocket返回 315 * @param ctx 316 * @param req 317 * @param res 318 */ 319 public void sendWebSocket(String msg) throws Exception { 320 if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) { 321 throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息"); 322 } 323 this.ctx.channel().write(new TextWebSocketFrame(msg)); 324 this.ctx.flush(); 325 } 326 327 }
BananaWebSocketServerHandler.java
FunWebSocketServerHandler.java:(outBoundAdapter,处理从服务器发出的响应)
1 package com.company.server; 2 3 import java.net.SocketAddress; 4 5 import com.company.util.BufToString; 6 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelOutboundHandlerAdapter; 9 import io.netty.channel.ChannelPromise; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 13 14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter{ 15 16 @Override 17 public void read(ChannelHandlerContext ctx) throws Exception { 18 ChannelHandlerContext readRes = ctx.read(); 19 System.out.println(ctx.name() + " is read in " + readRes.toString()); 20 } 21 22 @Override 23 public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ 24 System.out.println(ctx.name() + " handlerAdded = = " + ctx.name()); 25 } 26 27 @Override 28 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ 29 System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name()); 30 } 31 32 @Override 33 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, 34 ChannelPromise promise) throws Exception { 35 ctx.bind(localAddress, promise); 36 System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString()); 37 } 38 @Override 39 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, 40 SocketAddress localAddress, ChannelPromise promise) throws Exception { 41 ctx.connect(remoteAddress, localAddress, promise); 42 System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client " + remoteAddress.toString() + " in " + promise.toString()); 43 } 44 @Override 45 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) 46 throws Exception { 47 ctx.disconnect(promise); 48 System.out.println(ctx.name() + " is disconnect in " + promise.toString()); 49 } 50 @Override 51 public void close(ChannelHandlerContext ctx, ChannelPromise promise) 52 throws Exception { 53 ctx.close(promise); 54 System.out.println(ctx.name() + " is close in " + promise.toString()); 55 } 56 @Override 57 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 58 ctx.deregister(promise); 59 System.out.println(ctx.name() + " is deregister in " + promise.toString()); 60 } 61 62 @Override 63 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 64 ctx.write(msg, promise); 65 System.out.print(ctx.name() + " is write in " + promise.toString()); 66 if(msg instanceof DefaultFullHttpResponse) { 67 System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content())); 68 } 69 else if(msg instanceof TextWebSocketFrame) { 70 System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text()); 71 } 72 else if(msg instanceof CloseWebSocketFrame) { 73 System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText()); 74 } 75 else { 76 System.out.println(" with message : " + msg.getClass()); 77 } 78 } 79 @Override 80 public void flush(ChannelHandlerContext ctx) throws Exception { 81 ctx.flush(); 82 System.out.println(ctx.name() + " is flush"); 83 } 84 }
FunWebSocketServerHandler.java
banana.html:(聊天室前端)
1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="UTF-8"> 5 <title>Netty WebSocket 聊天实例</title> 6 </head> 7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script> 8 <script src="map.js" type="text/javascript"></script> 9 <script type="text/javascript"> 10 $(document).ready(function() { 11 var uuid = guid(); // uuid在一个会话唯一 12 var nameOnline = ‘‘; // 上线姓名 13 var onlineName = new Map(); // 已上线人员, <requestId, name> 14 15 $("#name").attr("disabled","disabled"); 16 $("#onlineBtn").attr("disabled","disabled"); 17 $("#downlineBtn").attr("disabled","disabled"); 18 19 $("#banana").hide(); 20 21 // 初始化websocket 22 var socket; 23 if (!window.WebSocket) { 24 window.WebSocket = window.MozWebSocket; 25 } 26 if (window.WebSocket) { 27 socket = new WebSocket("ws://localhost:9090/"); 28 socket.onmessage = function(event) { 29 console.log("收到服务器消息:" + event.data); 30 if (event.data.indexOf("isSucc") != -1) {// 这里需要判断是客户端请求服务端返回后的消息(response) 31 var response = JSON.parse(event.data); 32 if (response != undefined && response != null) { 33 if (response.serviceId == 1001) { // 上线 34 if (response.isSucc) { 35 // 上线成功,初始化已上线人员 36 onlineName.clear(); 37 $("#showOnlineNames").empty(); 38 for (var reqId in response.hadOnline) { 39 onlineName.put(reqId, response.hadOnline[reqId]); 40 } 41 initOnline(); 42 43 $("#name").attr("disabled","disabled"); 44 $("#onlineBtn").attr("disabled","disabled"); 45 $("#downlineBtn").removeAttr("disabled"); 46 $("#banana").show(); 47 } else { 48 alert("上线失败"); 49 } 50 } else if (response.serviceId == 1004) { 51 if (response.isSucc) { 52 onlineName.clear(); 53 $("#showBanana").empty(); 54 $("#showOnlineNames").empty(); 55 $("#name").removeAttr("disabled"); 56 $("#onlineBtn").removeAttr("disabled"); 57 $("#downlineBtn").attr("disabled","disabled"); 58 $("#banana").hide(); 59 } else { 60 alert("下线失败"); 61 } 62 } 63 } 64 } else {// 还是服务端向客户端的请求(request) 65 var request = JSON.parse(event.data); 66 if (request != undefined && request != null) { 67 if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上线/下线 68 if (request.serviceId == 1001) { 69 onlineName.put(request.requestId, request.name); 70 } 71 if (request.serviceId == 1004) { 72 onlineName.removeByKey(request.requestId); 73 } 74 75 initOnline(); 76 } else if (request.serviceId == 1003) { // 有人发消息 77 appendBanana(request.name, request.message); 78 } 79 } 80 } 81 }; 82 socket.onopen = function(event) { 83 $("#name").removeAttr("disabled"); 84 $("#onlineBtn").removeAttr("disabled"); 85 console.log("已连接服务器"); 86 }; 87 socket.onclose = function(event) { // WebSocket 关闭 88 console.log("WebSocket已经关闭!"); 89 }; 90 socket.onerror = function(event) { 91 console.log("WebSocket异常!"); 92 }; 93 } else { 94 alert("抱歉,您的浏览器不支持WebSocket协议!"); 95 } 96 97 // WebSocket发送请求 98 function send(message) { 99 if (!window.WebSocket) { return; } 100 if (socket.readyState == WebSocket.OPEN) { 101 socket.send(message); 102 } else { 103 console.log("WebSocket连接没有建立成功!"); 104 alert("您还未连接上服务器,请刷新页面重试"); 105 } 106 } 107 108 // 刷新上线人员 109 function initOnline() { 110 $("#showOnlineNames").empty(); 111 for (var i=0;i<onlineName.size();i++) { 112 $("#showOnlineNames").append(‘<tr><td>‘ + (i+1) + ‘</td>‘ + 113 ‘<td>‘ + onlineName.element(i).value + ‘</td>‘ + 114 ‘</tr>‘); 115 } 116 } 117 // 追加聊天信息 118 function appendBanana(name, message) { 119 $("#showBanana").append(‘<tr><td>‘ + name + ‘: ‘ + message + ‘</td></tr>‘); 120 } 121 122 $("#onlineBtn").bind("click", function() { 123 var name = $("#name").val(); 124 if (name == null || name == ‘‘) { 125 alert("请输入您的尊姓大名"); 126 return; 127 } 128 129 nameOnline = name; 130 // 上线 131 send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name})); 132 }); 133 134 $("#downlineBtn").bind("click", function() { 135 // 下线 136 send(JSON.stringify({"requestId":uuid, "serviceId":1004})); 137 }); 138 139 $("#sendBtn").bind("click", function() { 140 var message = $("#messageInput").val(); 141 if (message == null || message == ‘‘) { 142 alert("请输入您的聊天信息"); 143 return; 144 } 145 146 // 发送聊天消息 147 send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message})); 148 $("#messageInput").val(""); 149 }); 150 151 }); 152 153 function guid() { 154 function S4() { 155 return (((1+Math.random())*0x10000)|0).toString(16).substring(1); 156 } 157 return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4()); 158 } 159 </script> 160 <body> 161 <h1>Netty WebSocket 聊天实例</h1> 162 <input type="text" id="name" value="佚名" placeholder="姓名" /> 163 <input type="button" id="onlineBtn" value="上线" /> 164 <input type="button" id="downlineBtn" value="下线" /> 165 <hr/> 166 <table id="banana" border="1" > 167 <tr> 168 <td width="600" align="center">聊天</td> 169 <td width="100" align="center">上线人员</td> 170 </tr> 171 <tr height="200" valign="top"> 172 <td> 173 <table id="showBanana" border="0" width="600"> 174 <!-- 175 <tr> 176 <td>张三: 大家好</td> 177 </tr> 178 <tr> 179 <td>李四: 欢迎加入群聊</td> 180 </tr> 181 --> 182 </table> 183 </td> 184 <td> 185 <table id="showOnlineNames" border="0"> 186 <!-- 187 <tr> 188 <td>1</td> 189 <td>张三</td> 190 <tr/> 191 <tr> 192 <td>2</td> 193 <td>李四</td> 194 <tr/> 195 --> 196 </table> 197 </td> 198 </tr> 199 <tr height="40"> 200 <td></td> 201 <td></td> 202 </tr> 203 <tr> 204 <td> 205 <input type="text" id="messageInput" style="width:590px" placeholder="巴拉巴拉点什么吧" /> 206 </td> 207 <td> 208 <input type="button" id="sendBtn" value="发送" /> 209 </td> 210 </tr> 211 </table> 212 213 </body> 214 </html>
banana.html
分别运行WebSocketServer和ReservedWebSocketServer,运行日志如下:
============先adapter后handler============== ============连接开始======================== adapter handlerAdded = = adapter channel handlerAdded = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) channel Read0 = = handler with http request : adapter is write in [email protected](incomplete) with message : adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============上线用户======================== channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"} adapter is write in [email protected](incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"}} adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============发送信息======================== channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"} adapter is write in [email protected](incomplete) with socket message : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"} adapter is flush adapter is write in [email protected](incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}} adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============下线用户======================== channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004} adapter is write in [email protected](incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}} adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============用户断线======================== channel Read0 = = handler with socket request : ? adapter is write in [email protected](incomplete) close reason : adapter is flush adapter is close in [email protected](success) adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245]) channel Inactive = = handler channel close = = handler adapter is close in [email protected](success) channel Unregistered = = handler channel handlerRemoved = = handler adapter handlerRemoved = = adapter
WebSocketServer运行结果
以及
============先adapter后handler============== ============连接开始======================== channel handlerAdded = = handler adapter handlerAdded = = adapter adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) channel Read0 = = handler with http request : adapter is write in [email protected](incomplete) with message : adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============上线用户======================== channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"} adapter is write in [email protected](incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":{"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"}} channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============发送信息======================== channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"} adapter is write in [email protected](incomplete) with socket message : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"} adapter is write in [email protected](incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":{}} channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============下线用户======================== channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004} adapter is write in [email protected](incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":{}} channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============用户断线======================== channel Read0 = = handler with socket request : ? adapter is write in [email protected](incomplete) close reason : adapter is flush adapter is close in [email protected](success) channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671]) channel Inactive = = handler channel close = = handler channel Unregistered = = handler adapter handlerRemoved = = adapter channel handlerRemoved = = handler
ReversedWebSocketServer运行结果
除了运行顺序不同,outBoundAdapter的flush操作也多了几次,尤其在发送这一块,因为不仅要接收数据包,还要发送数据包,要多刷新adapter。
由此可见,netty的pipeline一定要仔细规划,能先让服务器处理就先让服务器处理,把outbound拦截器放在inbound拦截器前面。
原文地址:https://www.cnblogs.com/dgutfly/p/11536116.html