介绍
为了提供并发处理效率,把用户的请求连接随机分配到线程池的线程进行处理,hbase也是采用同样的方式处理用户请求的
客户端代码可以参考:基于java.nio.channels的编程实践-I
代码
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class NIOSocketServer2 extends Thread { private static final Logger LOG = LoggerFactory .getLogger(NIOSocketServer2.class); private static final String CHARSET = "UTF-8"; private static final int BUFFER_SIZE = 1024; private static final int FAIL_TRY_NUM = 3; private Selector selector; private ServerSocketChannel ssc; private static NIOSocketServer2 server; private Reader[] readers = null; private Random rand = new Random(); private int readerSize = 1; /** * 程序入口 * * @param args */ public static void main(String[] args) { server = new NIOSocketServer2(); try { // server.setDaemon(true); server.initServer(); server.start(); } catch (Exception e) { e.printStackTrace(); // 如果出现异常,则直接关闭客户端 server.stopServer(); System.exit(1); } } @Override public void run() { int failNum = 0; while (true) { try { int select = selector.select(); if (select > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { doAcceptable(key); } iter.remove(); } } } catch (Exception e) { failNum++; if (failNum > FAIL_TRY_NUM) { server.stopServer(); } } } } /** * 初始化服务器端程序,开始监听端口 * * @throws IOException */ private void initServer() throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(2181)); ssc.register(selector, SelectionKey.OP_ACCEPT); ExecutorService readPool = Executors.newFixedThreadPool( readerSize, new ThreadFactoryBuilder() .setNameFormat("RpcServer.reader=%d,port=" + 2181) .setDaemon(true).build()); readers = new Reader[readerSize]; for (int i = 0; i < readerSize; ++i) { Reader reader = new Reader(); readers[i] = reader; readPool.execute(reader); } } /** * 停止服务器 * * @throws IOException */ private void stopServer() { try { if (selector != null && selector.isOpen()) { selector.close(); } if (ssc != null && ssc.isOpen()) { ssc.close(); } } catch (IOException e) { LOG.info("关闭服务端失败:" + e.getMessage()); } } Reader getReader() { return readers[rand.nextInt(readerSize) % readers.length]; } /** * 对新的客户端连接进行处理 * * @param key * @throws IOException */ private void doAcceptable(SelectionKey key) throws IOException { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { try { channel.configureBlocking(false); channel.socket().setTcpNoDelay(true); channel.socket().setKeepAlive(true); } catch (IOException ioe) { channel.close(); throw ioe; } Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); } finally { reader.finishAdd(); } } } /** * 已连接 * * @param key */ private void doConnectable(SelectionKey key) { LOG.info("connect is ok"); } /** * 写消息到客户端 * * @param key * @throws IOException */ private void doWriteMessage(SelectionKey key) throws Exception { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.wrap("server write msg to client" .getBytes(CHARSET)); while (buffer.hasRemaining()) { sc.write(buffer); } TimeUnit.SECONDS.sleep(1); } /** * @param key * @throws IOException */ private void doReadMessage(SelectionKey key) throws Exception { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE); int read = sc.read(bb); while (read > 0) { bb.flip(); byte[] barr = new byte[bb.limit()]; bb.get(barr); LOG.info("server read msg from client:" + new String(barr, CHARSET)); bb.clear(); read = sc.read(bb); } TimeUnit.SECONDS.sleep(1); } private class Reader implements Runnable { private volatile boolean adding = false; private final Selector readSelector; Reader() throws IOException { this.readSelector = Selector.open(); } public void run() { try { doRunLoop(); } finally { try { readSelector.close(); } catch (IOException ioe) { LOG.error(getName() + ": error closing read selector in " + getName(), ioe); } } } private synchronized void doRunLoop() { while (true) { SelectionKey key = null; try { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys() .iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { try { doReadMessage(key); } catch (Exception e) { e.printStackTrace(); } } if (key.isWritable()) { try { doWriteMessage(key); } catch (Exception e) { e.printStackTrace(); } } } key = null; } } catch (InterruptedException e) { if (true) { // unexpected -- log it LOG.info(getName() + ": unexpectedly interrupted: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error(getName() + ": error in Reader", ex); } } } /** * This gets reader into the state that waits for the new channel to be * registered with readSelector. If it was waiting in select() the * thread will be woken up, otherwise whenever select() is called it * will return even if there is nothing to read and wait in * while(adding) for finishAdd call */ public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } public synchronized void finishAdd() { adding = false; this.notify(); } } }
时间: 2024-10-07 17:46:17