通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。
项目结构:
NettyServer.java:
package Server; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.execution.ExecutionHandler; import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import Util.Constant; public class NettyServer { public static String host = "127.0.0.1"; // 创建1个线程池 static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); public static void main(String[] args) { // ChannelFactory final ChannelFactory channelFactory = new NioServerSocketChannelFactory( // Boss线程池,处理Socket请求 Executors.newCachedThreadPool(), // Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel Executors.newCachedThreadPool()); // ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler); bootstrap.setPipelineFactory(serverPipelineFactory); // 禁用nagle算法 bootstrap.setOption("child.tcpNoDelay", true); // 启用TCP保活检测 bootstrap.setOption("child.keepAlive", true); // 监听5个端口 bootstrap.bind(new InetSocketAddress(Constant.p1)); System.out.println("Listening port " + Constant.p1 + "..."); bootstrap.bind(new InetSocketAddress(Constant.p2)); System.out.println("Listening port " + Constant.p2 + "..."); bootstrap.bind(new InetSocketAddress(Constant.p3)); System.out.println("Listening port " + Constant.p3 + "..."); bootstrap.bind(new InetSocketAddress(Constant.p4)); System.out.println("Listening port " + Constant.p4 + "..."); bootstrap.bind(new InetSocketAddress(Constant.p5)); System.out.println("Listening port " + Constant.p5 + "..."); } }
ServerPipelineFactory.java:
package Server; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.handler.execution.ExecutionHandler; public class ServerPipelineFactory implements ChannelPipelineFactory { private final ExecutionHandler executionHandler; public ServerPipelineFactory(ExecutionHandler executionHandler){ this.executionHandler = executionHandler; } @Override public ChannelPipeline getPipeline() throws Exception { // TODO Auto-generated method stub return Channels.pipeline( new StringEncoder(), new StringDecoder(), // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前 executionHandler, // 业务逻辑handler new MyServerHandler()); } }
MyServerHandler.java:
package Server; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import Util.Tool; public class MyServerHandler extends SimpleChannelHandler{ @SuppressWarnings("static-access") @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println("Server received:" + e.getMessage()); // 休息随机秒后发送消息 Thread th = Thread.currentThread(); int interval = Tool.getInterval(100); th.sleep(interval*1000); e.getChannel().write("from Server: Hello!"); super.messageReceived(ctx, e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); super.exceptionCaught(ctx, e); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("A client connected!"); super.channelConnected(ctx, e); } }
NettyClient.java:
package Client; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.execution.ExecutionHandler; import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import Util.Constant; public class NettyClient extends Thread{ public static String host = "127.0.0.1"; ClientBootstrap bootstrap; int port; // 创建1个线程池 static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); public NettyClient(int port) { this.port = port; // ChannelFactory final ChannelFactory channelFactory = new NioClientSocketChannelFactory( // Boss线程池 Executors.newCachedThreadPool(), // Worker线程池 Executors.newCachedThreadPool()); // ServerBootstrap bootstrap = new ClientBootstrap(channelFactory); ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler); bootstrap.setPipelineFactory(clientPipelineFactory); bootstrap.setOption("tcpNoDelay" ,true); bootstrap.setOption("keepAlive", true); bootstrap.connect(new InetSocketAddress(port)); } public void run(){ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // 开始试图连接 System.out.println("Connecting to port " + port + "..."); // 等待直到连接关闭或失败 future.getChannel().getCloseFuture().awaitUninterruptibly(); // 关闭线程池准备退出 bootstrap.releaseExternalResources(); } public static void main(String[] args) { NettyClient nc1 = new NettyClient(Constant.p1); NettyClient nc2 = new NettyClient(Constant.p2); NettyClient nc3 = new NettyClient(Constant.p3); NettyClient nc4 = new NettyClient(Constant.p4); NettyClient nc5 = new NettyClient(Constant.p5); nc1.start(); nc2.start(); nc3.start(); nc4.start(); nc5.start(); } }
ClientPipelineFactory.java:
package Client; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.handler.execution.ExecutionHandler; public class ClientPipelineFactory implements ChannelPipelineFactory { private final ExecutionHandler executionHandler; public ClientPipelineFactory(ExecutionHandler executionHandler){ this.executionHandler = executionHandler; } @Override public ChannelPipeline getPipeline() throws Exception { // TODO Auto-generated method stub return Channels.pipeline( new StringEncoder(), new StringDecoder(), // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前 executionHandler, // 业务逻辑handler new MyClientHandler()); } }
MyClientHandler.java:
package Client; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import Util.Tool; public class MyClientHandler extends SimpleChannelHandler{ // 连接到服务端时,发出消息 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("Connected to Server!"); e.getChannel().write("from Client: Hello! " + System.currentTimeMillis()); super.channelConnected(ctx, e); } @SuppressWarnings("static-access") @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println("Client Received:" + e.getMessage()); // 休息随机秒后发送消息 Thread th = Thread.currentThread(); int interval = Tool.getInterval(5); th.sleep(interval*1000); e.getChannel().write("from Client: Hello! " + System.currentTimeMillis()); super.messageReceived(ctx, e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); super.exceptionCaught(ctx, e); } }
Constant.java:
package Util; public class Constant { final static int start = 10000; public static int p1 = start + 1; public static int p2 = start + 2; public static int p3 = start + 3; public static int p4 = start + 4; public static int p5 = start + 5; }
Tool.java:
package Util; import java.util.Random; public class Tool { static Random rand = new Random(); public static int getInterval(int max){ return rand.nextInt(max); } }
时间: 2024-10-08 21:38:39