实现一个非阻塞IO的服务器

先来实现一个简单的服务器,这个服务器简单的回送任何客户端的输入

EchoServer.java

package server;

import java.io.*;
import java.net.*;
import java.util.*;

/**
 * This program implements a simple server that listens to port 8189 and echoes back all client input
 * @author zhangchen
 *
 */
public class EchoServer {

    public static void main(String[] args) throws IOException
    {
        //establish server socket
        try(ServerSocket s=new ServerSocket(8189))
        {
            //wait for client connection
            try(Socket incoming=s.accept())
            {
                InputStream inStream=incoming.getInputStream();
                OutputStream outStream=incoming.getOutputStream();

                try(Scanner in=new Scanner(inStream))
                {
                    PrintWriter out=new PrintWriter(outStream,true);
                    out.println("hello,enter bye to exit");

                    //echo client input
                    boolean done=false;
                    while(!done&&in.hasNextLine())
                    {
                        String line=in.nextLine();
                        out.println("Echo: "+line);
                        if(line.trim().equalsIgnoreCase("BYE")) done=true;
                    }

                }
            }
        }
    }

}

可以在命令行中使用telnet localhost 8189来连接这个服务器

这个服务器有一个问题,那就是不能同时为多个客户端服务。自然而然的,我们想到了使用多线程,下面实现一个多线程的服务器,功能同上:

ThreadedEchoServer.java

package threaded;

import java.io.*;
import java.net.*;
import java.util.*;

/**
 * This program implementsa multithreaded server that listens to port 8189 and echoes back
 * all client input.
 * @author zhangchen
 *
 */
public class ThreadedEchoServer {
    public static void main(String[] args)
    {
        try
        {
            int i=1;
            ServerSocket s=new ServerSocket(8189);
            while(true)
            {
                Socket incoming=s.accept();
                System.out.println("Spawning"+i);
                Runnable r=new ThreadEchoHandler(incoming);
                Thread t=new Thread(r);
                t.start();
                i++;
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();

        }
    }

}

/**
 * This class handles the client input for one server socket connection
 * @author zhangchen
 *
 */
class ThreadEchoHandler implements Runnable
{

    private Socket incoming;

    /**
     * Constructs a handler
     * @param i the incoming socket
     */
    public ThreadEchoHandler(Socket i) {
        // TODO Auto-generated constructor stub
        incoming=i;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        try
        {
            try
            {
                InputStream inStream=incoming.getInputStream();
                OutputStream outStream=incoming.getOutputStream();

                Scanner in=new Scanner(inStream);
                PrintWriter out=new PrintWriter(outStream,true);

                out.println("hello gougou");

                //echo client input
                boolean done=false;
                while(!done&&in.hasNextLine())
                {
                    String line=in.nextLine();
                    out.println("Echo: "+line);
                    if(line.trim().equalsIgnoreCase("BYE")) done=true;
                }
            }
            finally
            {
                incoming.close();
            }
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }
    }

每当一个新的客户端连接到服务器,服务器都建立一个新的线程来为这个客户端服务。

在这个程序中,我们为每个连接生成一个独立的线程。这种方法不能满足高性能服务器的要求,为了实现更高的吞吐量,我们利用java.nio包中的特性来实现一个非阻塞IO的服务器,这个服务器的功能是将客户端输入的文件路径对应文件的内容发送给客户端:

NonBlockingServer.java


import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;
import java.lang.*;

public class NonBlockingServer
{
    public Selector sel = null;
    public ServerSocketChannel server = null;
    public SocketChannel socket = null;
    public int port = 4900;
    String result = null;

    public NonBlockingServer()
    {
        System.out.println("Inside default ctor");
    }

    public NonBlockingServer(int port)
    {
        System.out.println("Inside the other ctor");
        port = port;
    }

    //初始化
    public void initializeOperations() throws IOException,UnknownHostException
    {
        System.out.println("Inside initialization");
        sel = Selector.open();
        server = ServerSocketChannel.open();
        server.configureBlocking(false); //将通道的阻塞模式设置为非阻塞模式
        InetAddress ia = InetAddress.getLocalHost();
        InetSocketAddress isa = new InetSocketAddress(ia,port);
        server.socket().bind(isa);
    }

    public void startServer() throws IOException
    {
        System.out.println("Inside startserver");
        initializeOperations();
        System.out.println("Abt to block on select()");

        //Registers this channel with the given selector, returning a selection key.
        //A channel should be registered according to the events it will handle.
        //for instance, a channel that accepts incoming connection should be registered as follow
        SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT ); 

        while (acceptKey.selector().select() > 0 )
        {   

            Set readyKeys = sel.selectedKeys();
            Iterator it = readyKeys.iterator();

            while (it.hasNext()) {
                SelectionKey key = (SelectionKey)it.next();
                it.remove();//After a key is processed, it is removed from the list of ready keys

                if (key.isAcceptable()) {
                    System.out.println("Key is Acceptable");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    socket = (SocketChannel) ssc.accept();
                    socket.configureBlocking(false);

                    //this "another" key binds with a channel responsible for read and write -by zc
                    SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                if (key.isReadable()) {
                    System.out.println("Key is readable");
                    String ret = readMessage(key);//从key对应的SocketChannel中读出字符串
                    if (ret.length() > 0) {
                        writeMessage(socket,ret);//将ret指向的文件中的内容写入socket指向的SocketChannel
                    }
                }
                if (key.isWritable()) {
                    System.out.println("THe key is writable");
                    String ret = readMessage(key);//从key对应的SocketChannel中读出字符串
                    socket = (SocketChannel)key.channel();
                    if (result.length() > 0 ) {
                        writeMessage(socket,ret);//将ret指向的文件中的内容写入socket指向的SocketChannel
                    }
                }
            }
        }
    }

    //将ret指向的文件中的内容写入socket指向的SocketChannel
    public void writeMessage(SocketChannel socket,String ret)
    {
        System.out.println("Inside the loop");

        if (ret.equals("quit") || ret.equals("shutdown")) {
            return;
        }
        File file = new File(ret);
        try
        {

            RandomAccessFile rdm = new RandomAccessFile(file,"r");
            FileChannel fc = rdm.getChannel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            fc.read(buffer);
            buffer.flip();

            Charset set = Charset.forName("us-ascii");
            CharsetDecoder dec = set.newDecoder();
            CharBuffer charBuf = dec.decode(buffer);
            System.out.println(charBuf.toString());
            buffer = ByteBuffer.wrap((charBuf.toString()).getBytes());
            int nBytes = socket.write(buffer);
            System.out.println("nBytes = "+nBytes);
                result = null;
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }

    }

    //从key对应的SocketChannel中读出字符串
    public String readMessage(SelectionKey key)
    {
        int nBytes = 0;
        socket = (SocketChannel)key.channel();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try
        {
            nBytes = socket.read(buf);
            buf.flip();
            Charset charset = Charset.forName("us-ascii");
            CharsetDecoder decoder = charset.newDecoder();
            CharBuffer charBuffer = decoder.decode(buf);
            result = charBuffer.toString();

        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        return result;
    }

    public static void main(String args[])
    {
        NonBlockingServer nb = new NonBlockingServer();
        try
        {
            nb.startServer();
        }
        catch (IOException e)
        {
            e.printStackTrace();
            System.exit(-1);
        }

    }
}

Client.java


import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.lang.*;

public class Client
{
    public SocketChannel client = null;
    public InetSocketAddress isa = null;
    public RecvThread rt = null;

    public Client()
    {
    }

    public void makeConnection()
    {
        int result = 0;
        try
        {

            client = SocketChannel.open();
                isa = new InetSocketAddress("zhangchen-PC",4900);
            client.connect(isa);
            client.configureBlocking(false);
            receiveMessage();
        }
        catch(UnknownHostException e)
        {
            e.printStackTrace();
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        while ((result = sendMessage()) != -1)
        {
        }

        try
        {
            client.close();
            System.exit(0);
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }

    public int sendMessage()
    {
        System.out.println("Inside SendMessage");
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        String msg = null;
        ByteBuffer bytebuf = ByteBuffer.allocate(1024);
        int nBytes = 0;
        try
        {
            msg = in.readLine();
            System.out.println("msg is "+msg);
            bytebuf = ByteBuffer.wrap(msg.getBytes());
            nBytes = client.write(bytebuf);
            System.out.println("nBytes is "+nBytes);
            if (msg.equals("quit") || msg.equals("shutdown")) {
                System.out.println("time to stop the client");
                interruptThread();
                try
                {
                    Thread.sleep(5000);
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
                client.close();
                return -1;
            }

        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        System.out.println("Wrote "+nBytes +" bytes to the server");
        return nBytes;
    }

    public void receiveMessage()
    {
        rt = new RecvThread("Receive THread",client);
        rt.start();

    }

    public void interruptThread()
    {
        rt.val = false;
    }

    public static void main(String args[])
    {
        Client cl = new Client();
        cl.makeConnection();
    }

    public class RecvThread extends Thread
    {
        public SocketChannel sc = null;
        public boolean val = true;

        public RecvThread(String str,SocketChannel client)
        {
            super(str);
            sc = client;
        }

        public void run() {

            System.out.println("Inside receivemsg");
            int nBytes = 0;
            ByteBuffer buf = ByteBuffer.allocate(2048);
            try
            {
                while (val)
                {
                    while ( (nBytes = nBytes = client.read(buf)) > 0){
                        buf.flip();
                        //Flips this buffer.
                        //The limit is set to the current position and then the position is set to zero.
                        //If the mark is defined then it is discarded. 

                        Charset charset = Charset.forName("us-ascii");
                        CharsetDecoder decoder = charset.newDecoder();
                        CharBuffer charBuffer = decoder.decode(buf);
                        String result = charBuffer.toString();
                                    System.out.println(result);
                        buf.flip();

                    }
                }

            }
            catch(IOException e)
            {
                e.printStackTrace();

            }

        }
    }
}

运行结果如图:

时间: 2024-08-08 10:00:20

实现一个非阻塞IO的服务器的相关文章

使用OTP原理构建一个非阻塞的TCP服务器(转)

经测试可用! 原文地址:http://www.iucai.com/?paged=8 Erlang OTP设计原理已经被shiningray兄翻译透了.请参见.http://erlang.shiningray.cn/otp-design-principles/index.html 这里翻译了一篇余锋老大和lzy.je老大推荐的文章,闲话不说,奉上. 使用OTP原理构建一个非阻塞的TCP服务器 原文网址:(打不开的同学请自觉FQ) http://www.trapexit.org.nyud.net:8

Netty:一个非阻塞的客户端/服务器框架

Netty:一个非阻塞的客户端/服务器框架 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs Netty是一个异步事件驱动的网络应用框架,为Java网络应用的开发带来了一些新活力.Netty由协议服务器和客户端所组成,可用于快速开发可维护的高性能软件.Netty应用框架及其工具简化了网络编程,而且由Netty社区进行维护. Netty还被归类为NIO客户端/服务器框架,用它能够快速.简易地开发网络应用,使得TCP和UDP套接字服务器的网络编程得以简化和

使用OTP原则构建一个非阻塞的TCP服务器

http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principles CONTENTS [hide] 1 Author 2 Overview 3 Server Design 4 Application and Supervisor behaviours 5 Listener Process 6 Client Socket Handling Process 7 Applicat

Linux非阻塞IO(五)使用poll实现非阻塞的回射服务器客户端

前面几节我们讨论了非阻塞IO的基本概念.Buffer的设计以及非阻塞connect的实现,现在我们使用它们来完成客户端的编写. 我们在http://www.cnblogs.com/inevermore/p/4049165.html中提出过,客户端需要监听stdin.stdout和sockfd. 这里需要注意的是 只有缓冲区可写的时候,才去监听sockfd和stdin的读事件. 过去在阻塞IO中,我们总是监听sockfd的读事件,因为每当sockfd可读,我们就去调用用户的回调函数处理read事件

非阻塞IO服务器模型

我们来考虑一个情形,你跟千千万万个玩家是魔兽世界的超级粉丝,每周末准时组团打boss.每当周末游戏服务器就亚历山大,因为起码几十万用户同时在线.如果用我们的多线程阻塞服务器作为游戏服务器是否可行呢?先分析游戏服务器有哪些特点: ①  网络游戏并非像网页一样,打开一旦下载完就可以关闭连接结束.网游必须是有一个持久有状态的连接,每一个客户端都需要跟服务器存在一个持久的连接,以便快速及时发送消息.而随着并发用户数量的增加,多线程阻塞服务器不可能为每一个客户端分配一个线程. ②  跟一般的应用服务器不同

Java中的阻塞和非阻塞IO包各自的优劣思考

NIO 设计背后的基石:反应器模式,用于事件多路分离和分派的体系结构模式. 反应器(Reactor):用于事件多路分离和分派的体系结构模式 通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞 与非阻塞 .所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待 状态, 直到有东西可读或者可写为止.而对于非阻塞状态, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待 . 一种常用做法是:每建立一个Socket

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念                                                          我们通俗一点讲: Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写.如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就

异步非阻塞IO的Python Web框架--Tornado

Tornado的全称是Torado Web Server,从名字上就可知它可用作Web服务器,但同时它也是一个Python Web的开发框架.最初是在FriendFeed公司的网站上使用,FaceBook收购之后便进行了开源. 作为Web框架,是一个轻量级的Web框架,类似于另一个Python web 框架Web.py,其拥有异步非阻塞IO的处理方式. 作为Web服务器,Tornado有较为出色的抗负载能力,官方用nginx反向代理的方式部署Tornado和其它Python web应用框架进行对

[Z] linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO

原文链接:http://blog.csdn.net/colzer/article/details/8169075 IO概念 Linux的内核将所有外部设备都可以看做一个文件来操作.那么我们对与外部设备的操作都可以看做对文件进行操作.我们对一个文件的读写,都通过调用内核提供的系统调用:内核给我们返回一个file descriptor(fd,文件描述符).而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符).描述符就是一个数字,指向内核中一个结构体(文件路径,数据