基于NIO的Client/Server程序实践

转自    http://blog.csdn.net/zhangzhaokun/article/details/6612833

本意是想看明白Zookeeper内部的代码是怎么玩的,在琢磨一段时间之后,发现还是自己先独立写一个基于NIO的C/S模式程序,看看有哪些细微之处要注意,再来跟进ZK的细节比较靠谱一些,于是乎就自己练手写了如下这段代码 ,权当预热下使用NIO来编写网络程序这一知识点了,在这里记述这段代码的目的无非是加深下自己的印象,并且后续还可以有思索和改进的空间。

基本功能:服务器端不停的向Client发送“how are you?”,客户端不停的接收该消息,接收完消息后则向服务器端发送问候语“Server,how are you?”

(1)服务器端程序

package com.hadoop.nio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author Administrator
 *
 */
public class NIOSocketServer extends Thread {

    private Selector selector;

    private ServerSocketChannel ssc;

    /**
     * @param args
     */
    public static void main(String[] args) {
        NIOSocketServer server = new NIOSocketServer();
        try {
            // server.setDaemon(true);
            server.initServer();
            server.start();
        } catch (Exception e) {
            e.printStackTrace();
            server.stopServer();
        }
    }

    public void run() {
        while (true) {
            try {
                int select = selector.select();
                if (select > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isAcceptable()) {
                            doAcceptable(key);
                        }
                        if (key.isWritable()) {
                            doWriteMessage(key);
                        }
                        if (key.isReadable()) {
                            doReadMessage(key);
                        }
                        if (key.isConnectable()) {
                            doConnectable(key);
                        }
                        iter.remove();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 初始化服务器端程序,开始监听端口
     *
     * @throws IOException
     * @throws ClosedChannelException
     */
    private void initServer() throws IOException, ClosedChannelException {
        selector = Selector.open();

        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.socket().bind(new InetSocketAddress(2181));
        ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 停止服务器端
     */
    private void stopServer() {
        try {
            if (selector != null && selector.isOpen()) {
                selector.close();
            }
            if (ssc != null && ssc.isOpen()) {
                ssc.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 对新的客户端连接进行处理
     *
     * @param key
     * @throws IOException
     * @throws ClosedChannelException
     */
    private void doAcceptable(SelectionKey key) throws IOException,
            ClosedChannelException {
        System.out.println("is acceptable");
        ServerSocketChannel tempSsc = (ServerSocketChannel) key.channel();
        SocketChannel ss = tempSsc.accept();
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    /**
     * 写消息到客户端
     *
     * @param key
     * @throws IOException
     * @throws UnsupportedEncodingException
     */
    private void doWriteMessage(SelectionKey key) throws IOException,
            UnsupportedEncodingException {
        System.out.println("is writable");
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.wrap("how are you?".getBytes("UTF-8"));
        while (buffer.hasRemaining()) {
            sc.write(buffer);
        }
        // sk.interestOps(SelectionKey.OP_READ);
    }

    /**
     * 读取客户端传递过来的消息
     *
     * @param key
     * @throws IOException
     * @throws UnsupportedEncodingException
     */
    private void doReadMessage(SelectionKey key) throws IOException,
            UnsupportedEncodingException {
        System.out.println("is readable");
        SocketChannel sc = (SocketChannel) key.channel();

        ByteBuffer bb = ByteBuffer.allocate(8);
        System.out.println("receive from clint:");
        int read = sc.read(bb);
        while (read > 0) {
            bb.flip();

            byte[] barr = new byte[bb.limit()];
            bb.get(barr);

            System.out.print(new String(barr, "UTF-8"));
            bb.clear();

            read = sc.read(bb);
        }
        System.out.println("");
        // sk.interestOps(SelectionKey.OP_WRITE);
    }

    /**
     * 已连接
     *
     * @param key
     */
    private void doConnectable(SelectionKey key) {
        System.out.println("is connectalbe");
    }
}
(2)客户端程序
/**
 *
 */
package com.hadoop.nio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author Administrator
 *
 */
public class NIOSocketClient extends Thread {
    private SocketChannel socketChannel;
    private Selector selector;

    /**
     * @param args
     */
    public static void main(String[] args) {
        NIOSocketClient client = new NIOSocketClient();
        try {
            client.initClient();
            client.start();
            // client.setDaemon(true);
        } catch (Exception e) {
            e.printStackTrace();
            client.stopServer();
        }
    }

    public void run() {
        while (true) {
            try {
                // 写消息到服务器端
                writeMessage();

                int select = selector.select();
                if (select > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey sk = iter.next();
                        if (sk.isReadable()) {
                            readMessage(sk);
                        }
                        iter.remove();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void readMessage(SelectionKey sk) throws IOException,
            UnsupportedEncodingException {
        SocketChannel curSc = (SocketChannel) sk.channel();
        ByteBuffer buffer = ByteBuffer.allocate(8);
        while (curSc.read(buffer) > 0) {
            buffer.flip();
            System.out.println("Receive from server:"
                    + new String(buffer.array(), "UTF-8"));
            buffer.clear();
        }
    }

    public void writeMessage() throws IOException {
        try {
            String ss = "Server,how are you?";
            ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes("UTF-8"));
            while (buffer.hasRemaining()) {
                System.out.println("buffer.hasRemaining() is true.");
                socketChannel.write(buffer);
            }
        } catch (IOException e) {
            if (socketChannel.isOpen()) {
                socketChannel.close();
            }
            e.printStackTrace();
        }
    }

    public void initClient() throws IOException, ClosedChannelException {
        InetSocketAddress addr = new InetSocketAddress(2181);
        socketChannel = SocketChannel.open();

        selector = Selector.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 连接到server
        socketChannel.connect(addr);

        while (!socketChannel.finishConnect()) {
            System.out.println("check finish connection");
        }
    }

    /**
     * 停止客户端
     */
    private void stopServer() {
        try {
            if (selector != null && selector.isOpen()) {
                selector.close();
            }
            if (socketChannel != null && socketChannel.isOpen()) {
                socketChannel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

(3)需要改进的点

1、客户端与服务器端的IO异常等没有处理,程序逻辑上不健壮

2、对于序列化与反序列化后传递的消息,发送端把消息发送完毕,接收端把消息接收完毕如何界定?这里的完毕,如果是不加分析的接收和发送完成那是比较简单的,复杂之处在于如何去判定大数据量的情况下,按照对应的协议把它解析出来?需要一个可扩展的数据协议,后续我会发明一个轮子来解决如下几个问题:A,按照制定分隔符号来解析数据 B:按照【消息长度】【消息内容】这种变长消息来解析数据 C:Java对象的序列化与反序列化

3、监听和处理OP_ACCEPT、OP_READ、OP_WRITE事件的线程数量如何分配才能完成高效的协同,可以根据业务需要调配比率,实现最佳性能?

4、监听OP_WRITE操作比较麻烦,只要写缓冲区没有满则一直可以写,需要频繁的删除和注册OP_WRITE事件,极为影响性能,如何优化?

5、在多个连接同时连接到服务端的时候,一个Selector是否够用,一个Selector能够管理多少Channel呢?另外从Channel中读取消息、向Channel中写消息是否就一定要用SubReactor来处理吗?用WorkThread来处理有什么不好?

6、TCP协议麻烦之处在于接收/发送数据并不能保证有序,在客户端同步等待服务端响应的时候,如何将服务端的异步处理转化为对应于客户端的同步处理?当然客户端也是可以同步或者异步的来访问服务端的。

7、如何避免业务程序的数据与NIO的缓冲区之间相互拷贝,降低性能?怎样才能把一个ByteBuffer的内容从签到后都用一个呢,避免Copy来降低性能。

这些问题的提出,我结合不少网上的资料,也参照了一下Mina、Netty的原理介绍之流的文章,但是并没有去阅读这两个框架的源代码,我寻思着,把这些问题先从理论上解决掉,再自行用代码去实践,把上述的例子给丰满起来,基本解决这些问题之后,再去研读和比较Mina、Netty的具体实现,这样一趟下来就能对Java+NIO+Channel的编程有一个整体的认识了。好几年来,我都想做这个事情,可是一直由于各种原因停滞了,这次务必要实践起来,在这个博文里面提出来这个观点,就在于我想以此宣示来告诉自己务必要一直走到底。

至此,我认为学习一个框架或者是一个知识点的最好办法就是去实践,去以自己的想法发明一个轮子,再模拟场景进行测试,之后再看看开源产品中那些大师是如何实现的,两项对比之下,自己的不足之处就找出来了,也知道该怎么改进了。

				
时间: 2025-01-04 02:13:06

基于NIO的Client/Server程序实践的相关文章

开源一个基于nio的java网络程序

因为最近要从公司离职,害怕用nio写的网络程序没有人能看懂(或许是因为写的不好吧),就调整成了mina(这样大家接触起来非常方便,即使没有socket基础,用起来也不难),所以之前基于nio写的网络程序就开放出来好了! 写的比较挫,大家见谅! 首先是PollServer类,主要处理select,做网络事件的监听和基于FutureTask的数据发送,代码如下: package gs.gate; import gs.gate.handle.ClientHandle; import java.util

《Python Network Programming Cookbook》读书笔记1---套接字, IPv4, 简单的Client/Server程序

这一部分主要介绍python中socket模块的相关内容,socket即套接字. socket是使用TCP/IP协议的应用程序通常采用的应用编程接口,它位于运输层和应用层之间,起源于UNIX,由于遵从UNIX“一切皆文件的”思想故socket可看作一种特殊的文件,对其的操作基本可以视为读写I/O.打开.关闭.关于套接字的基本概念@吴秦的Linux Socket编程(不限Linux)写的很详细,大家可以参考. 在下面列出的各个部分中我将先贴出代码,然后对其进行解释. 通过python3获得本机名和

基于I/O的Server/Client实现

在前面的文章中讲了基于NIO实现的Server/Client.本文就讲讲基于同步堵塞式I/O实现的Server/Client好与前面的NIO中的Server/Client进行对照. 网络编程中须要解决的两个主要问题: 1.怎样准确的定位网络上的一台或多台主机. 2.找到主机后怎样可靠高效的进行传输数据. 而解决这两个问题的主要方式就是非常好的运用TCP/IP协议.所以我们所做的网络编程都是基于TCP/IP来实现的. 基于Socket的java网络编程的通信过程: server:使用ServerS

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty 转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO以后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在以后推出. 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎

SQL Server虚拟化系列(3)&mdash;&mdash;构建理想的基于VMware的SQL Server虚拟机

虚拟化变得越来越常见,并且在不了解虚拟化如何工作的情况下,DBA在尝试解决性能问题时会出现盲点,例如减少资源争用或改进备份和恢复操作等. 在本文中我们将主要讲述为您的SQL Server工作负载构建理想的基于VMware的虚拟机.我们的下一篇文章将介绍怎么样在Hyper-V上构建对应的SQL Server虚拟化环境. 现在,作为DBA,您可能没有访问权限来创建用于SQL Server的新虚拟机.这些操作可以交给您的VM管理员,他们将为您部署合适的VM环境. 以下详细信息适用于在Windows S

JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

阻塞IO与非阻塞IO 通常情况下的Socket都是阻塞式的, 程序的输入输出都会让当前线程进入阻塞状态, 因此服务器需要为每一个客户端都创建一个线程. 从JAVA1.4开始引入了NIO API, NIO可以实现非阻塞IO, 这样就可以使用一个线程处理所有的客户请求. 基于NIO的非阻塞Socket通信 服务器将用来监听客户端请求的channel注册到selector上,启动一个线程,使用selector的select()获取求情的客户端的channel数量, 当监听到有客户端请求时,就通过Sel

基于.NET平台的分布式应用程序的研究

摘 要:.NET框架是Microsoft用于生成分布式Web应用程序和Web服务的下一代平台.概述了用于生成分布式应用程序的.NET框架的基本原理.重点讲述了.NET框架的基础:公共语言运行时(CLR)和微软中间语言(MSIL),以及.NET革命的核心:Web Service.最后结合具体步骤,给出了利用.NET平台建造具有良好可伸缩性的分布式应用程序的过程. 关键词:.NET;分布式计算; Web服务; CLR; MSIL       The Research of Distributed A

Java基础知识强化之网络编程笔记24:Android网络通信之 AndroidAsync(基于nio的异步通信库)

1. AndroidAsync   AndroidAsync 是一个基于nio的异步socket ,http(客户端服务器端),websocket,socket.io库,AndroidAsync 是一个底层的网络协议库,如果你想要一个容易使用,高级的,http请求库,请使用Ion(它是基于AndroidAsync 的),正常来说开发者更倾向于使用  Ion. 如果你需要一个未被封装的Android的raw Socket, HTTP client/server, WebSocket, and So

基于NIO的Socket通信

一.NIO模式的基本原理: 服务端: 首先,服务端打开一个通道(ServerSocketChannel),并向通道中注册一个通道调度器(Selector):然后向通道调度器注册感兴趣的事件SelectionKey(如:OP_ACCEPT),接着就可以使用通道调度器(Selector)轮询通道(ServerSocketChannel)上注册的事件,并进行相应的处理. 客户端: 客户端在请求与服务端通信时,也可以像服务器端一样注册感兴趣的事件(比服务端少了SelectionKey.OP_ACCEPT