理解了Netty的流程后,代码还是比较容易理解的,直接贴出代码
主启动程序:
public class ChatServer { private final ChannelGroup group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address){ ServerBootstrap boot = new ServerBootstrap(); boot.group(workerGroup).channel(NioServerSocketChannel.class).childHandler(createInitializer(group)); ChannelFuture f = boot.bind(address).syncUninterruptibly(); channel = f.channel(); return f; } protected ChannelHandler createInitializer(ChannelGroup group2) { return new ChatServerInitializer(group2); } public void destroy(){ if(channel != null) channel.close(); group.close(); workerGroup.shutdownGracefully(); } public static void main(String[] args) { final ChatServer server = new ChatServer(); ChannelFuture f = server.start(new InetSocketAddress(2048)); System.out.println("server start................"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { server.destroy(); } }); f.channel().closeFuture().syncUninterruptibly(); } }
初始化链:
public class ChatServerInitializer extends ChannelInitializer<Channel> { private final ChannelGroup group; public ChatServerInitializer(ChannelGroup group) { super(); this.group = group; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64*1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(group)); } }
HTTP请求处理:
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; public HttpRequestHandler(String wsUri) { super(); this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { if(wsUri.equalsIgnoreCase(msg.getUri())){ ctx.fireChannelRead(msg.retain()); }else{ if(HttpHeaders.is100ContinueExpected(msg)){ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } RandomAccessFile file = new RandomAccessFile(HttpRequestHandler.class.getResource("/").getPath()+"/index.html", "r"); HttpResponse response = new DefaultHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8"); boolean isKeepAlive = HttpHeaders.isKeepAlive(msg); if(isKeepAlive){ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); if(ctx.pipeline().get(SslHandler.class) == null){ ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); }else{ ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if(isKeepAlive == false){ future.addListener(ChannelFutureListener.CLOSE); } file.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(System.err); } }
websocket处理:
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group) { super(); this.group = group; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){ ctx.pipeline().remove(HttpRequestHandler.class); group.writeAndFlush(new TextWebSocketFrame("Client "+ctx.channel()+" joined!")); group.add(ctx.channel()); }else{ super.userEventTriggered(ctx, evt); } } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { group.writeAndFlush(msg.retain()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
网页index.html代码:
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Insert title here</title> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:2048/ws"); socket.onmessage = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = ta.value + ‘\n‘ + event.data }; socket.onopen = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = "连接开启!"; }; socket.onclose = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = ta.value + "连接被关闭"; }; } else { alert("你的浏览器不支持!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接没有开启."); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="Hello, World!"><input type="button" value="发送消息" onclick="send(this.form.message.value)"> <h3>输出:</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> <input type="button" onclick="javascript:document.getElementById(‘responseText‘).value=‘‘" value="清空"> </form> </body> </html>
运行主程序后,在支持Websocket的浏览器中输入 : 127.0.0.1:2048 就可以打开聊天页面,打开多个页面,可以互相聊天。
项目已经打包上传,
项目地址:https://github.com/ivyboy/nettyWebsocket
时间: 2024-10-11 04:51:14