一、示例介绍
示例取自《基于Netty5.0高级案例一之NettyWebsocket》,和《Netty inAction》中11章的例子一样,这个例子通过WebSocket实现了一个聊天室的群发功能。但后者的例子我没本事跑通。
新建一个Maven项目,项目名称叫NettyWebSocket,具体过程请参考前一贴。别忘了在pom.xml中加入netty5.0的依赖。
在项目中新建4个class:
4个类的代码你可以从后面的内容中找到,这里先不考虑代码的问题。用“复制>>粘贴”将代码原样拷贝到这4个源文件中再说。
注意,如果源代码中缺少import语句,请自行fixed一下。
在NettyServe.java上右键,Run As >> Java Application,运行服务端代码。此时控制台将输出:
服务端开启等待客户端连接 ... ...
然后在磁盘上创建一个index.html文件,文件内容你也可以在后面的内容中找到。在Finder中双击index.html文件,用Safari打开。
在Dock栏上右击Safari图标,选择“新建窗口”,打开另一个Safari窗口。然后将第一个Safari窗口中的地址复制到第二个Safari窗口地址栏中,回车。
将两个窗口并列,你可以看到在一个窗口中输入的聊天消息,在另一个窗口中会实时得到刷新,显然是服务端通过WebSocket同时向所有连接的客户端进行了推送:
在服务端控制台中也会有输出:
测试完毕,我们下面再来介绍代码。
注意,如果使用Safari测试,当你关闭Safari,服务端会输出“客户端与服务端连接关闭”。如果使用Chrome测试,当你关闭Chrome时,服务端会抛出一个“UnsupportedOperationException”异常。
二、Global.java
这个类很简单,就是定义了一个全局变量ChannelGroup group,这样在后面的其它类(主要是MyWebSocketServerHandler)中就不用定义group了,直接使用就行了。
源代码(如果你已经复制/粘贴过源代码了,请跳过):
public class Global {
public staticChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
三、ChildChannelHandler.java
这个类实现了ChannelInitializer,即Channel与ChannelHandler的绑定,也就是向pipeline中填入各个ChannelHandler。
源代码(如果你已经复制/粘贴过源代码了,请跳过):
public classChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec",new HttpServerCodec());
e.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
e.pipeline().addLast("handler",newMyWebSocketServerHandler());
}
}
四、MyWebSocketServerHandler.java
服务器所有的业务逻辑被放到这里,当然也包括我们的WebSocket群发。
源代码(如果你已经复制/粘贴过源代码了,请跳过):
public class MyWebSocketServerHandlerextends
SimpleChannelInboundHandler<Object>{
private static final Logger logger = Logger
.getLogger(WebSocketServerHandshaker.class.getName());
private WebSocketServerHandshaker handshaker;
@Override
public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
// 添加
Global.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启");
}
@Override
public voidchannelInactive(ChannelHandlerContext ctx) throws Exception {
// 移除
Global.group.remove(ctx.channel());
System.out.println("客户端与服务端连接关闭");
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, ((FullHttpRequest) msg));
} else if (msg instanceofWebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public voidchannelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private voidhandlerWebSocketFrame(ChannelHandlerContext ctx,
WebSocketFrameframe) {
// 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame
.retain());
return;
}
// 判断是否ping消息
else if (frame instanceofPingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.content().retain()));
return;
}
// 本例程仅支持文本消息,不支持二进制消息
else if (!(frame instanceofTextWebSocketFrame)) {
System.out.println("本例程仅支持文本消息,不支持二进制消息");
throw newUnsupportedOperationException(String.format(
"%s frame types notsupported", frame.getClass().getName()));
}
// 返回应答消息
Stringrequest = ((TextWebSocketFrame) frame).text();
System.out.println("服务端收到:" + request);
if (logger.isLoggable(Level.FINE)) {
logger
.fine(String.format("%s received %s", ctx.channel(),
request));
}
TextWebSocketFrametws = new TextWebSocketFrame(new Date().toString()
+ctx.channel().id() + ":" + request);
// 群发
Global.group.writeAndFlush(tws);
// 返回【谁发的发给谁】
// ctx.channel().writeAndFlush(tws);
}
private voidhandleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequestreq) {
if (!req.getDecoderResult().isSuccess()
||(!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactorywsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:7397/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequestreq, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code()!= 200) {
ByteBufbuf = Unpooled.copiedBuffer(res.getStatus().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuturef = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private static boolean isKeepAlive(FullHttpRequest req) {
return false;
}
@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}