基于事件的 NIO 多线程服务器--转载

JDK1.4 的 NIO 有效解决了原有流式 IO 存在的线程开销的问题,在 NIO 中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个 CPU 的处理能力和处理中的等待时间,达到提高服务能力的目的。

多线程的引入,容易为本来就略显复杂的 NIO 代码进一步降低可读性和可维护性。引入良好的设计模型,将不仅带来高性能、高可靠的代码,也将带来一个惬意的开发过程。

线程模型

NIO 的选择器采用了多路复用(Multiplexing)技术,可在一个选择器上处理多个套接字, 通过获取读写通道来进行 IO 操作。由于网络带宽等原因,在通道的读、写操作中是容易出现等待的, 所以在读、写操作中引入多线程,对性能提高明显,而且可以提高客户端的感知服务质量。所以本文的模型将主要通过使用读、写线程池 来提高与客户端的数据交换能力。

如下图所示,服务端接受客户端请求后,控制线程将该请求的读通道交给读线程池,由读线程池分配线程完成对客户端数据的读取操作;当读线程完成读操作后,将数据返回控制线程,进行服务端的业务处理;完成 业务处理后,将需回应给客户端的数据和写通道提交给写线程池,由写线程完成向客户端发送回应数据的操作。

(NIO 多线程服务器模型)

同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 >关闭连接 ]这个 过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。 
下面我们就来详细看一下这个模型的各个组成部分。

回页首

相关事件定义 在这个模型中,我们定义了一些基本的事件:

(1)onAccept:当服务端收到客户端连接请求时,触发该事件。通过该事件我们可以知道有新的客户端呼入。该事件可用来控制服务端的负载。例如,服务器可设定同时只为一定数量客户端提供服务,当同时请求数超出数量时,可在响应该事件时直接抛出异常,以拒绝新的连接。

(2)onAccepted:当客户端请求被服务器接受后触发该事件。该事件表明一个新的客户端与服务器正式建立连接。

(3)onRead:当客户端发来数据,并已被服务器控制线程正确读取时,触发该事件 。该事件通知各事件处理器可以对客户端发来的数据进行实际处理了。需要注意的是,在本模型中,客户端的数据读取是由控制线程交由读线程完成的,事件处理器不需要在该事件中进行专门的读操作,而只需将控制线程传来的数据进行直接处理即可。

(4)onWrite:当客户端可以开始接受服务端发送数据时触发该事件,通过该事件,我们可以向客户端发送回应数据。 在本模型中,事件处理器只需要在该事件中设置

(5)onClosed:当客户端与服务器断开连接时触发该事件。

(6)onError:当客户端与服务器从连接开始到最后断开连接期间发生错误时触发该事件。通过该事件我们可以知道有什么错误发生。

回页首

事件回调机制的实现

在这个模型中,事件采用广播方式,也就是所有在册的事件处理器都能获得事件通知。这样可以将不同性质的业务处理,分别用不同的处理器实现,使每个处理器的业务功能尽可能单一。 
如下图:整个事件模型由监听器、事件适配器、事件触发器、事件处理器组成。

(事件模型)

  1. 监听器(Serverlistener):这是一个事件接口,定义需监听的服务器事件,如果您需要定义更多的事件,可在这里进行扩展。

    	 public interface Serverlistener {
       public void onError(String error);
       public void onAccept() throws Exception;
       public void onAccepted(Request request) throws Exception;
       public void onRead(Request request) throws Exception;
       public void onWrite(Request request, Response response) throws Exception;
       public void onClosed(Request request) throws Exception;
     }
  2. 事件适配器(EventAdapter):对 Serverlistener 接口实现一个适配器 (EventAdapter),这样的好处是最终的事件处理器可以只处理所关心的事件。

     public abstract class EventAdapter implements Serverlistener {
        public EventAdapter() {
        }
        public void onError(String error) {}
        public void onAccept() throws Exception {}
        public void onAccepted(Request request)  throws Exception {}
        public void onRead(Request request)  throws Exception {}
        public void onWrite(Request request, Response response)  throws Exception {}
        public void onClosed(Request request)  throws Exception {}
     }
  3. 事件触发器(Notifier):用于在适当的时候通过触发服务器事件,通知在册的事件处理器对事件做出响应。触发器以 Singleton 模式实现,统一控制整个服务器端的事件,避免造成混乱。

    public class Notifier {
        private static Arraylist listeners = null;
        private static Notifier instance = null;
        private Notifier() {
            listeners = new Arraylist();
        }
        /**
         * 获取事件触发器
         * @return 返回事件触发器
         */
        public static synchronized Notifier getNotifier() {
            if (instance == null) {
                instance = new Notifier();
                return instance;
            }
            else return instance;
        }
        /**
         * 添加事件监听器
         * @param l 监听器
         */
        public void addlistener(Serverlistener l) {
            synchronized (listeners) {
                if (!listeners.contains(l))
                    listeners.add(l);
            }
        }
        public void fireOnAccept() throws Exception {
            for (int i = listeners.size() - 1; i >= 0; i--)
                ( (Serverlistener) listeners.get(i)).onAccept();
        }
        ....// other fire method
     }
  4. 事件处理器(Handler):继承事件适配器,对感兴趣的事件进行响应处理,实现业务处理。以下是一个简单的事件处理器实现,它响应 onRead 事件,在终端打印出从客户端读取的数据。

     public class ServerHandler extends EventAdapter {
        public ServerHandler() {
        }
        public void onRead(Request request) throws Exception {
            System.out.println("Received: " + new String(data));
        }
     }
  5. 事件处理器的注册。为了能让事件处理器获得服务线程的事件通知,事件处理器需在触发器中注册。

     ServerHandler handler = new ServerHandler();
     Notifier.addlistener(handler);

回页首

实现 NIO 多线程服务器

NIO 多线程服务器主要由主控服务线程、读线程和写线程组成。

(线程模型)

  1. 主控服务线程(Server):主控线程将创建读、写线程池,实现监听、接受客户端请求,同时将读、写通道提交由相应的读线程(Reader)和写服务线程 (Writer) ,由读写线程分别完成对客户端数据的读取和对客户端的回应操作。

    public class Server implements Runnable {
        ....
        private static int MAX_THREADS = 4;
        public Server(int port) throws Exception {
            ....
            // 创建无阻塞网络套接
            selector = Selector.open();
            sschannel = ServerSocketChannel.open();
            sschannel.configureBlocking(false);
            address = new InetSocketAddress(port);
            ServerSocket ss = sschannel.socket();
            ss.bind(address);
            sschannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        public void run() {
            System.out.println("Server started ...");
            System.out.println("Server listening on port: " + port);
            // 监听
            while (true) {
                try {
                    int num = 0;
                    num = selector.select();
                    if (num > 0) {
                        Set selectedKeys = selector.selectedKeys();
                        Iterator it = selectedKeys.iterator();
                        while (it.hasNext()) {
                            SelectionKey key = (SelectionKey) it.next();
                            it.remove();
                            // 处理 IO 事件
                            if ( (key.readyOps() & SelectionKey.OP_ACCEPT) ==
    							SelectionKey.OP_ACCEPT) {
                               // Accept the new connection
                               ServerSocketChannel ssc =
    							   (ServerSocketChannel) key.channel();
                               notifier.fireOnAccept();
                               SocketChannel sc = ssc.accept();
                               sc.configureBlocking(false);
                               // 触发接受连接事件
                               Request request = new Request(sc);
                               notifier.fireOnAccepted(request); 
    
                               // 注册读操作 , 以进行下一步的读操作
                               sc.register(selector,  SelectionKey.OP_READ, request);
                           }
                           else if ( (key.readyOps() & SelectionKey.OP_READ) ==
    						   SelectionKey.OP_READ ) {
    						   // 提交读服务线程读取客户端数据
                               Reader.processRequest(key);
                               key.cancel();
                           }
                           else if ( (key.readyOps() & SelectionKey.OP_WRITE) ==
    						   SelectionKey.OP_WRITE ) {
    						   // 提交写服务线程向客户端发送回应数据
                               Writer.processRequest(key);
                               key.cancel();
                           }
                        }
                    }
                    else {
                        addRegister();  // 在 Selector 中注册新的写通道
                    }
                }
                catch (Exception e) {
                    notifier.fireOnError("Error occured in Server: " + e.getMessage());
                    continue;
                }
            }
        }
        ....
    }
  2. 读线程(Reader):使用线程池技术,通过多个线程读取客户端数据,以充分利用网络数据传输的时间,提高读取效率。

    public class Reader extends Thread {
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            pool.wait();
                        }
                        key = (SelectionKey) pool.remove(0);
                    }
                    // 读取客户端数据,并触发 onRead 事件
                    read(key);
                }
                catch (Exception e) {
                    continue;
                }
            }
        }
        ....
     }
  3. 写线程(Writer):和读操作一样,使用线程池,负责将服务器端的数据发送回客户端。

     public final class Writer extends Thread {
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            pool.wait();
                        }
                        key = (SelectionKey) pool.remove(0);
                    }
                    // 向客户端发送数据,然后关闭连接,并分别触发 onWrite,onClosed 事件
                    write(key);
                }
                catch (Exception e) {
                    continue;
                }
            }
        }
        ....
     }

回页首

具体应用

NIO 多线程模型的实现告一段落,现在我们可以暂且将 NIO 的各个 API 和烦琐的调用方法抛于脑后,专心于我们的实际应用中。 
我们用一个简单的 TimeServer(时间查询服务器)来看看该模型能带来多么简洁的开发方式。 
在这个 TimeServer 中,将提供两种语言(中文、英文)的时间查询服务。我们将读取客户端的查询命令(GB/EN),并回应相应语言格式的当前时间。在应答客户的请求的同时,服务器将进行日志记录。做为示例,对日志记录,我们只是简单地将客户端的访问时间和 IP 地址输出到服务器的终端上。

  1. 实现时间查询服务的事件处理器(TimeHandler):

    public class TimeHandler extends EventAdapter {
        public TimeHandler() {
        }
        public void onWrite(Request request, Response response) throws Exception {
            String command = new String(request.getDataInput());
            String time = null;
            Date date = new Date();
            // 判断查询命令
            if (command.equals("GB")) {
                // 中文格式
                DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
                    DateFormat.FulL, Locale.CHINA);
                time = cnDate.format(date);
            }
            else {
                // 英文格式
                DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
                    DateFormat.FulL, Locale.US);
                time = enDate.format(date);
            }
            response.send(time.getBytes());
        }
     }
  2. 实现日志记录服务的事件处理器(LogHandler):

    public class LogHandler extends EventAdapter {
        public LogHandler() {
        }
        public void onClosed(Request request) throws Exception {
            String log = new Date().toString() + " from " + request.getAddress()
    			.toString();
            System.out.println(log);
        }
        public void onError(String error) {
            System.out.println("Error: " + error);
        }
    }
  3. 启动程序:

    public class Start {
        public static void main(String[] args) {
            try {
                LogHandler loger = new LogHandler();
                TimeHandler timer = new TimeHandler();
                Notifier notifier = Notifier.getNotifier();
                notifier.addlistener(loger);
                notifier.addlistener(timer);
                System.out.println("Server starting ...");
                Server server = new Server(5100);
                Thread tServer = new Thread(server);
                tServer.start();
            }
            catch (Exception e) {
                System.out.println("Server error: " + e.getMessage());
                System.exit(-1);
            }
        }
     }

回页首

小结

通过例子我们可以看到,基于事件回调的 NIO 多线程服务器模型,提供了清晰直观的实现方式,可让开发者从 NIO 及多线程的技术细节中摆脱出来,集中精力关注具体的业务实现。

原文:http://www.ibm.com/developerworks/cn/java/l-niosvr/

基于事件的 NIO 多线程服务器--转载

时间: 2024-10-10 05:39:24

基于事件的 NIO 多线程服务器--转载的相关文章

基于TCP协议的多线程服务器

功能:客户端发送消息给服务端,服务端回显消息给客户端. server端: Client端: 测试:

[转载] Comet:基于 HTTP 长连接的“服务器推”技术

转载自http://www.ibm.com/developerworks/cn/web/wa-lo-comet/ “服务器推”技术的应用 传统模式的 Web 系统以客户端发出请求.服务器端响应的方式工作.这种方式并不能满足很多现实应用的需求,譬如: 监控系统:后台硬件热插拔.LED.温度.电压发生变化: 即时通信系统:其它用户登录.发送信息: 即时报价系统:后台数据库内容发生变化: 这些应用都需要服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求.“服务器推”技术在现实应用中有一些解决

转载:Comet:基于 HTTP 长连接的“服务器推”技术

转自:http://www.ibm.com/developerworks/cn/web/wa-lo-comet/ 很多应用譬如监控.即时通信.即时报价系统都需要将后台发生的变化实时传送到客户端而无须客户端不停地刷新.发送请求.本文首先介绍.比较了常用的“服务器推”方案,着重介绍了 Comet - 使用 HTTP 长连接.无须浏览器安装插件的两种“服务器推”方案:基于 AJAX 的长轮询方式:基于 iframe 及 htmlfile 的流方式.最后分析了开发 Comet 应用需要注意的一些问题,以

Linux 下基于多线程服务器/客服端聊天程序源码

Linux 下基于多线程服务器/客服端聊天程序,采用阻塞的socket技术,和多线程技术实现. 客服端程序:client.c #include<stdio.h> #include<stdlib.h> #include<string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h>

基于多进程和基于多线程服务器的优缺点及nginx服务器的启动过程

基于多进程服务器的优点: 1.由操作系统进行调度,运行比较稳定强壮 2.能够方便地通过操作系统进行监控和管理 例如对每个进程的内存变化状况,甚至某个进程处理什么web请求进行监控.同时可以通过给进程发送信号量,实现对应用的各种管理 3.隔离性好 一个进程出现问题只有杀掉它重启就可以,不影响整体服务的可用性 很容易实现在线热部署和无缝升级 不需要考虑线程安全问题 4.充分利用多核cpu,实现并行处理 基于多进程服务器的缺点: 1.内存消耗比较大,每个进程都独立加载完整的应用环境 2.cpu消耗偏高

基于QT的多线程服务器

// thread.cpp #include "thread.h" Thread::Thread(int socketDescriptor, QObject *parent) : QThread(parent) { m_socketDescriptor = socketDescriptor; } Thread::~Thread() { } void Thread::run() { m_tcpSocket = new QTcpSocket; if (!m_tcpSocket->se

key-value 多线程服务器的Linux C++实现

项目需求 总体思路 网络通信 字符解析 数据存储与查询 1 存储管理 2 数据查询 多线程 待改进 GitHub源码 项目需求 设计一个基于Socket或基于HTTP的服务器,服务内容是提供一种简单的key/value映射关系的管 理与查询 下面的所有操作都是通过结构体Node来传递的: struct Node { char key[KEY_SIZE]; char value[VALUE_SIZE]; }; 本场景中需要client和server两个程序 client端只有两种操作: int A

Comet:基于 HTTP 长连接的“服务器推”技术

“服务器推”技术的应用 传统模式的 Web 系统以客户端发出请求.服务器端响应的方式工作.这种方式并不能满足很多现实应用的需求,譬如: 监控系统:后台硬件热插拔.LED.温度.电压发生变化: 即时通信系统:其它用户登录.发送信息: 即时报价系统:后台数据库内容发生变化: 这些应用都需要服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求.“服务器推”技术在现实应用中有一些解决方案,本文将这些解决方案分为两类:一类需要在浏览器端安装插件,基于套接口传送信息,或是使用 RMI.CORBA 进

MSDN搬运 之 [基于事件的异步模式]

基于事件的异步模式概述 那些同时执行多项任务.但仍能响应用户交互的应用程序通常需要实施一种使用多线程的设计方案.System.Threading 命名空间提供了创建高性能多线程应用程序所必需的所有工具,但要想有效地使用这些工具,需要有丰富的使用多线程软件工程的经验.对于相对简单的多线程应用程序,BackgroundWorker 组件提供了一个简单的解决方案.对于更复杂的异步应用程序,请考虑实现一个符合基于事件的异步模式的类. 基于事件的异步模式具有多线程应用程序的优点,同时隐匿了多线程设计中固有