基于java.nio.channels的编程实践-II

介绍

为了提供并发处理效率,把用户的请求连接随机分配到线程池的线程进行处理,hbase也是采用同样的方式处理用户请求的

客户端代码可以参考:基于java.nio.channels的编程实践-I

代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class NIOSocketServer2 extends Thread {
	private static final Logger LOG = LoggerFactory
			.getLogger(NIOSocketServer2.class);
	private static final String CHARSET = "UTF-8";
	private static final int BUFFER_SIZE = 1024;
	private static final int FAIL_TRY_NUM = 3;

	private Selector selector;
	private ServerSocketChannel ssc;
	private static NIOSocketServer2 server;
	private Reader[] readers = null;
	private Random rand = new Random();
	private int readerSize = 1;

	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		server = new NIOSocketServer2();
		try {
			// server.setDaemon(true);
			server.initServer();
			server.start();
		} catch (Exception e) {
			e.printStackTrace();
			// 如果出现异常,则直接关闭客户端
			server.stopServer();
			System.exit(1);
		}
	}

	@Override
	public void run() {
		int failNum = 0;
		while (true) {
			try {
				int select = selector.select();
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey key = iter.next();
						if (key.isAcceptable()) {
							doAcceptable(key);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				failNum++;
				if (failNum > FAIL_TRY_NUM) {
					server.stopServer();
				}
			}
		}

	}

	/**
	 * 初始化服务器端程序,开始监听端口
	 * 
	 * @throws IOException
	 */
	private void initServer() throws IOException {
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(2181));
		ssc.register(selector, SelectionKey.OP_ACCEPT);

		ExecutorService readPool = Executors.newFixedThreadPool(
				readerSize,
				new ThreadFactoryBuilder()
						.setNameFormat("RpcServer.reader=%d,port=" + 2181)
						.setDaemon(true).build());
		readers = new Reader[readerSize];
		for (int i = 0; i < readerSize; ++i) {
			Reader reader = new Reader();
			readers[i] = reader;
			readPool.execute(reader);
		}

	}

	/**
	 * 停止服务器
	 * 
	 * @throws IOException
	 */
	private void stopServer() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (ssc != null && ssc.isOpen()) {
				ssc.close();
			}
		} catch (IOException e) {
			LOG.info("关闭服务端失败:" + e.getMessage());
		}
	}

	Reader getReader() {
		return readers[rand.nextInt(readerSize) % readers.length];
	}

	/**
	 * 对新的客户端连接进行处理
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doAcceptable(SelectionKey key) throws IOException {
		ServerSocketChannel server = (ServerSocketChannel) key.channel();

		SocketChannel channel;
		while ((channel = server.accept()) != null) {
			try {
				channel.configureBlocking(false);
				channel.socket().setTcpNoDelay(true);
				channel.socket().setKeepAlive(true);
			} catch (IOException ioe) {
				channel.close();
				throw ioe;
			}

			Reader reader = getReader();
			try {
				reader.startAdd();
				SelectionKey readKey = reader.registerChannel(channel);
			} finally {
				reader.finishAdd();
			}
		}

	}

	/**
	 * 已连接
	 * 
	 * @param key
	 */
	private void doConnectable(SelectionKey key) {
		LOG.info("connect is ok");
	}

	/**
	 * 写消息到客户端
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doWriteMessage(SelectionKey key) throws Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.wrap("server write msg to client"
				.getBytes(CHARSET));
		while (buffer.hasRemaining()) {
			sc.write(buffer);
		}
		TimeUnit.SECONDS.sleep(1);
	}

	/**
	 * @param key
	 * @throws IOException
	 */
	private void doReadMessage(SelectionKey key) throws Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
		int read = sc.read(bb);
		while (read > 0) {
			bb.flip();
			byte[] barr = new byte[bb.limit()];
			bb.get(barr);
			LOG.info("server read msg from client:" + new String(barr, CHARSET));
			bb.clear();
			read = sc.read(bb);
		}
		TimeUnit.SECONDS.sleep(1);
	}

	private class Reader implements Runnable {
		private volatile boolean adding = false;
		private final Selector readSelector;

		Reader() throws IOException {
			this.readSelector = Selector.open();
		}

		public void run() {
			try {
				doRunLoop();
			} finally {
				try {
					readSelector.close();
				} catch (IOException ioe) {
					LOG.error(getName() + ": error closing read selector in "
							+ getName(), ioe);
				}
			}
		}

		private synchronized void doRunLoop() {
			while (true) {
				SelectionKey key = null;
				try {
					readSelector.select();
					while (adding) {
						this.wait(1000);
					}
					Iterator<SelectionKey> iter = readSelector.selectedKeys()
							.iterator();
					while (iter.hasNext()) {
						key = iter.next();
						iter.remove();
						if (key.isValid()) {
							if (key.isReadable()) {
								try {
									doReadMessage(key);
								} catch (Exception e) {
									e.printStackTrace();
								}
							}
							if (key.isWritable()) {
								try {
									doWriteMessage(key);
								} catch (Exception e) {
									e.printStackTrace();
								}
							}
						}
						key = null;
					}
				} catch (InterruptedException e) {
					if (true) { // unexpected -- log it
						LOG.info(getName() + ": unexpectedly interrupted: "
								+ StringUtils.stringifyException(e));
					}
				} catch (IOException ex) {
					LOG.error(getName() + ": error in Reader", ex);
				}
			}
		}

		/**
		 * This gets reader into the state that waits for the new channel to be
		 * registered with readSelector. If it was waiting in select() the
		 * thread will be woken up, otherwise whenever select() is called it
		 * will return even if there is nothing to read and wait in
		 * while(adding) for finishAdd call
		 */
		public void startAdd() {
			adding = true;
			readSelector.wakeup();
		}

		public synchronized SelectionKey registerChannel(SocketChannel channel)
				throws IOException {
			return channel.register(readSelector, SelectionKey.OP_READ
					| SelectionKey.OP_WRITE);
		}

		public synchronized void finishAdd() {
			adding = false;
			this.notify();
		}
	}
}
时间: 2024-10-07 17:46:17

基于java.nio.channels的编程实践-II的相关文章

基于java.nio.channels的编程实践

服务端代码 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketCh

基于Java NIO的Socket通信

Java NIO模式的Socket通信,是一种同步非阻塞IO设计模式,它为Reactor模式实现提供了基础. 下面看看,Java实现的一个服务端和客户端通信的例子. NIO模式的基本原理描述如下: 服务端打开一个通道(ServerSocketChannel),并向通道中注册一个选择器(Selector),这个选择器是与一些感兴趣的操作的标识(SelectionKey,即通过这个标识可以定位到具体的操作,从而进行响应的处理)相关联的,然后基于选择器(Selector)轮询通道(ServerSock

Netty 中 IOException: Connection reset by peer 与 java.nio.channels.ClosedChannelException: null

最近发现系统中出现了很多 IOException: Connection reset by peer 与 ClosedChannelException: null 深入看了看代码, 做了些测试, 发现 Connection reset 会在客户端不知道 channel 被关闭的情况下, 触发了 eventloop 的 unsafe.read() 操作抛出 而 ClosedChannelException 一般是由 Netty 主动抛出的, 在 AbstractChannel 以及 SSLHand

基于Java NIO的多人在线聊天工具源码实现(登录,单聊,群聊)

近来在学习Java NIO网络开发知识,写了一个基于Java NIO的多人在线聊天工具练练手.源码公开在Coding上: https://coding.net/u/hust_wsh/p/MyChat/git ,开发环境是Ubuntu14.04+Eclipse Mars+JDK1.8. 要想编写一个基于Java NIO的多人在线聊天工具,我总结需要以下几方面的地址:客户端服务器模型,Java NIO中的Selector,SocketChannel,ByteBuffer,Collections以及序

Java 连接Kafka报错java.nio.channels.ClosedChannelExcep

Java 客户端连接Kafka报如下错误 bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic [2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,po

Java线程与并发编程实践----同步器(交换器、信号量)

一.交换器 交换器提供了一个线程之间能够交换对象的同步点.每个线程都会往这个 交换器的exchange()方法传入一些对象,匹配伙伴线程,同时接受伙伴对象作为返 回值.java.util.conurrent.Exchange<V>实现了交换器. 下面是一个代码小实例: import java.util.concurrent.Exchanger;   import java.util.concurrent.ExecutorService;   import java.util.concurren

Java线程与并发编程实践----锁框架

Java.util.concurrent.locks包提供了一个包含多种接口和类的框架,它 针对条件进行加锁和等待.不同于对象的内置加锁同步以及java.lang.Object的等 待/通知机制,包含锁框架的并发工具类通过轮询锁.显示等待及其它方式改善这种 机制. 锁框架包含了经常使用的锁.重入锁.条件.读写锁以及冲入读写锁等类别. 一.锁(Lock) Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作.此实 现允许更灵活的结构,可以具有差别很大的属性,可以

java.nio.channels.IllegalBlockingModeException

报错信息如下: Exception in thread "main" java.nio.channels.IllegalBlockingModeException at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:201) at qinfeng.zheng.nio.NIOServer.main(NIOServer.java:43) 意思是非法的阻塞模式.

基于java的简单Socket编程

1TCP协议与UDP协议     1.1 TCP               TCP是(Tranfer Control Protocol)的简称,是一种面向连接的保证可靠传输的协议.通过TCP协议传输,得到的是一个顺序的无差错的数据流.发送方和接收方的成对的两个socket之间必须建立连接,当一个socket(通常都是server socket)等待建立连接时,另一个socket可以要求进行连接,一旦这两个socket连接起来,它们就可以进行双向数据传输,双方都可以进行发送或接收操作. TCP的