Netty5的架构比Netty4的架构及代码上都有很多的改进,看起来很清晰。
下面的根据官网例子改进的Echo Server例子代码。
代码中增加的有netty自带的hander,LineBasedFrameDecoder主要解决网络传输的粘包/拆包问题。StringDecoder和StringEncoder主要解决netty ByteBuf和string之间的转换。
Netty in action 上也提到了几个注意事项:读取消息后,记得要释放,必须在finally块释放(父类ChannelHandlerAdapter),和锁的释放位置是一样的,为的是确保释放操作必须执行,那为什么写消息不用显示释放的原因是Netty帮我们做了。当然,你的hander父类是SimpleChannelInboundHandler就不用显示释放了,因为Netty又帮你做了。
下面代码主要是给字符串添加delimiter(分割符)。
5.0.0.Alpha2版本:
/* * Copyright (C) 2014- now() The Netty5-2015 Authors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.doctor.netty5.ebook.netty_in_action05.common; import java.util.stream.Stream; /** * @author doctor * * @time 2015年7月6日 */ public final class NettyUtil { public static final String END_OF_LINE = "\n"; public static String appenEndOfLine(String... message) { StringBuilder stringBuilder = new StringBuilder(256); Stream.of(message).forEachOrdered(stringBuilder::append); stringBuilder.append(END_OF_LINE); return stringBuilder.toString(); } }
服务端代码:
/* * Copyright (C) 2014-present The Netty5-2015 Authors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.doctor.netty5.example.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.nio.charset.StandardCharsets; import com.doctor.netty5.ebook.netty_in_action05.common.NettyUtil; /** * @author doctor * * @time 2015年7月6日 */ public class EchoServer { private final int port; public EchoServer() { this(8989); } public EchoServer(int port) { this.port = port; } /** * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { new EchoServer().start(); } public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(port) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(2048)); ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind().sync(); System.out.println(EchoServer.class.getName() + " started and listen on port:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } private static class EchoServerHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); ctx.write(NettyUtil.appenEndOfLine("服务器返回:", (String) msg)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } }
客户端代码:
/* * Copyright (C) 2014-present The Netty5-2015 Authors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.doctor.netty5.example.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.ReferenceCountUtil; import java.nio.charset.StandardCharsets; import java.util.Scanner; import com.doctor.netty5.ebook.netty_in_action05.common.NettyUtil; /** * @author doctor * * @time 2015年7月6日 */ public class EchoClient { private final String host; private final int port; public EchoClient() { this("localhost", 8989); } public EchoClient(String host) { this(host, 8989); } public EchoClient(String host, int port) { this.host = host; this.port = port; } /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { new EchoClient().start(); } public void start() throws InterruptedException { NioEventLoopGroup workersGroup = new NioEventLoopGroup(1); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workersGroup) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(2048)); ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); } finally { workersGroup.shutdownGracefully(); } } private static class EchoClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(NettyUtil.appenEndOfLine("我要连接....")); new Thread(new Hander(ctx)).start(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { System.out.println(msg); } finally { // 读完消息记得释放,那写消息为什么不这样操作呢,因为写完消息netty自动释放。 // 其操作见:DefaultChannelHandlerInvoker L331-332,不过有这个注释-> promise cancelled // 是不少netty5正式发布的时候会取消呢。 // 我们可以使用SimpleChannelInboundHandler作为父类,因为释放操作已实现。 ReferenceCountUtil.release(msg); } } } private static class Hander implements Runnable { private ChannelHandlerContext ctx = null; private Scanner scanner = new Scanner(System.in); public Hander(ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { while (true) { String nextLine = scanner.nextLine(); ctx.writeAndFlush(NettyUtil.appenEndOfLine(nextLine)); } } } }
客户端启动了一个线程,异步操作从控制台获取输入字符串(回车结束)。
版权声明:本文为博主原创文章,未经博主允许不得转载。
时间: 2024-10-22 07:40:36