package com.mzj.nio.java; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Copyright (C),HTF<br> * NIO客户端<br> * NIO:1.缓冲区、2.选择器、3.通道、4.SelectionKey(事件类型)<br> * 类似于AWT事件机制 * * @author muzhongjiang * @date 2014年8月20日 */ public class NIOClient { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); private final int BLOCK = 4096;// 缓冲区大小 private ByteBuffer sendBuf = ByteBuffer.allocate(BLOCK);// 数据接收缓冲区 private ByteBuffer receiveBuf = ByteBuffer.allocate(BLOCK);// 数据发送缓冲区 private Selector selector;// 通道选择器 private final InetSocketAddress SERVER_ADDRESS; /** * 构造方法 * * @throws IOException */ public NIOClient(String hostName, int port) throws IOException { SERVER_ADDRESS = new InetSocketAddress(hostName, port); this.start(); } public void start() throws IOException { // 1.打开client端socket通道: SocketChannel socketChannel = SocketChannel.open(); // 2.通道设置为非阻塞式: socketChannel.configureBlocking(false); // 4.获得通道选择器: selector = Selector.open(); // 5.注册连接服务端的key: socketChannel.register(selector, SelectionKey.OP_CONNECT); // 6.连接服务端: socketChannel.connect(SERVER_ADDRESS); LOG.info("开启客户端..."); // 7.打开监听 this.listener(); } /** * 监听器 * * @throws IOException */ private void listener() throws IOException { while (true) { // 1.‘选择器‘获得一组‘已被client选择的key‘: int size = selector.select(); LOG.info("客户端 size:【" + size + "】"); // 2.‘key‘处理器(事件处理器) this.keyProcessor(selector.selectedKeys()); } } /** * key处理器(事件处理器)<br> * 对不同的key(事件)进行不同的处理 * * @throws IOException */ private void keyProcessor(Set<SelectionKey> selectedKeys) throws IOException { SocketChannel client = null; Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { this.handConnect(selectionKey, client); } else if (selectionKey.isReadable()) { this.handRead(selectionKey, client); } else if (selectionKey.isWritable()) { this.handWrit(selectionKey, client); } } } private void handConnect(SelectionKey selectionKey, SocketChannel client) throws IOException { client = (SocketChannel) selectionKey.channel(); if (client.isConnectionPending()) {// 判断此通道上是否正在进行连接操作。 if (client.finishConnect()) {// 完成套接字通道的连接过程 LOG.info("客户端完成连接"); sendBuf.clear(); // 缓冲区清空: sendBuf.put("hello world server".getBytes()); sendBuf.flip(); client.write(sendBuf); } // client请求读server: client.register(selector, SelectionKey.OP_READ); } } private void handRead(SelectionKey selectionKey, SocketChannel client) throws IOException { client = (SocketChannel) selectionKey.channel(); receiveBuf.clear();// 1.缓冲区清空: int count = client.read(receiveBuf);// 2.读取服务器发送来的数据到缓冲区中 // 3.数据处理: if (count > 0) { String receiveText = new String(receiveBuf.array(), 0, count); // 可以保存到数据库..... LOG.info("客户端接收服务端数据:receiveText:【" + receiveText + "】"); // client注册写: client.register(selector, SelectionKey.OP_WRITE); } } private void handWrit(SelectionKey selectionKey, SocketChannel client) throws IOException { client = (SocketChannel) selectionKey.channel(); // 1.将缓冲区清空 sendBuf.clear(); // 2.向缓冲区中输入数据 String sendText = String.valueOf(new Date().getTime()); sendBuf.put(sendText.getBytes()); // 3.将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位 sendBuf.flip(); // 4.输出到通道 client.write(sendBuf); LOG.info("客户端向服务器端发送数据--:" + sendText); // 5.client请求读server: client.register(selector, SelectionKey.OP_READ); } public static void main(String[] args) throws IOException { new NIOClient("127.0.0.1", 9999); } }
package com.mzj.nio.java; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Copyright (C),HTF<br> * NIO服务端<br> * NIO:1.缓冲区、2.选择器、3.通道、4.SelectionKey(事件类型)<br> * 类似于AWT事件机制 * * @author muzhongjiang * @date 2014年8月20日 */ public class NIOServer { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); private final int BLOCK = 4096;// 缓冲区大小 private ByteBuffer sendBuf = ByteBuffer.allocate(BLOCK);// 数据接收缓冲区 private ByteBuffer receiveBuf = ByteBuffer.allocate(BLOCK);// 数据发送缓冲区 private Selector selector;// 通道选择器 /** * 构造方法: */ public NIOServer(int port) throws IOException { // 1.打开server端socket通道: ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 2.通道设置为非阻塞式: serverSocketChannel.configureBlocking(false); // 3.获得Socket,并绑定服务‘地址‘: ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port)); // 4.获得通道选择器: selector = Selector.open(); /* * 5.在‘选择器‘上注册‘通道‘的‘事件(类型)‘<br> 一般ServerSocketChannel只注册accept事件,对于read和write事件是注册到accept的SocketChannel中的 */ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); LOG.info("开启服务端..."); // 6.打开监听 this.listener(); } /** * 监听器 * * @throws IOException */ private void listener() throws IOException { while (true) { // 1.‘选择器‘获得一组‘已被client选择的key‘: int size = selector.select(); LOG.info("服务端 size:【" + size + "】"); // 2.‘key‘处理器(事件处理器) this.keyProcessor(selector.selectedKeys()); } } /** * key处理器(事件处理器)<br> * 对不同的key(事件)进行不同的处理 * * @throws IOException */ private void keyProcessor(Set<SelectionKey> selectedKeys) throws IOException { SocketChannel client = null; Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()) { this.handAccept(selectionKey, client); } else if (selectionKey.isReadable()) { this.handRead(selectionKey, client); } else if (selectionKey.isWritable()) { this.handWrit(selectionKey, client); } } } private void handAccept(SelectionKey selectionKey, SocketChannel client) throws IOException { LOG.info("服务端 handAccept"); ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); // 1.获得client套接字通道: client = server.accept(); // 2.配置为非阻塞: client.configureBlocking(false); // 3.给客户端注册read的Key: client.register(selector, SelectionKey.OP_READ); } private void handRead(SelectionKey selectionKey, SocketChannel client) throws IOException { client = (SocketChannel) selectionKey.channel(); // 1.将缓冲区清空 receiveBuf.clear(); // 2.读取发送来的数据到缓冲区中 int count = client.read(receiveBuf); if (count > 0) { String receiveText = new String(receiveBuf.array(), 0, count); LOG.info("服务器端接受客户端数据:" + receiveText); // 3.client注册write key : client.register(selector, SelectionKey.OP_WRITE); } } private void handWrit(SelectionKey selectionKey, SocketChannel client) throws IOException { client = (SocketChannel) selectionKey.channel(); // 1.将缓冲区清空 sendBuf.clear(); // 2.向缓冲区中输入数据 String sendText = String.valueOf(new Date().getTime()); sendBuf.put(sendText.getBytes()); // 3.将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位 sendBuf.flip(); // 4.输出到通道 client.write(sendBuf); LOG.info("向客户端发送数据:" + sendText); // 5.client注册read key : client.register(selector, SelectionKey.OP_READ); } public static void main(String[] args) throws IOException { new NIOServer(9999); } }
时间: 2025-01-03 19:09:02