《用Java写一个通用的服务器程序》03 处理新socket

在讲监听器时说过处理的新的socket要尽快返回,监听器调用的是ClientFactory的createPhysicalConnection方法,那么就来看这个方法:

    public boolean createPhysicalConnection(PushClientSocket socket,
            boolean isObserver, ListenerOptions listenerOptions) {
        PhysicalConnectionPool thePhysicalConnectionPool =
            serverImpl.getPhysicalConnectionPool();
        IOQueue<PhysicalConnection> ioQueue = serverImpl.getIOQueue();

        // 内置了一个PhysicalConnection的对象池,这样可以避免每次都要
        // 创建PhysicalConnection对象,可以加快处理速度
        PhysicalConnection connection =
            thePhysicalConnectionPool.borrowObject();
        // 把PhysicalConnection对象和socket对象关联起来
        connection.reset(socket, isObserver, listenerOptions);

        // 初始化协议,分配buffer,用来缓存解析请求时的数据
        if (!connection.setUpProtocolContexts()) {
            thePhysicalConnectionPool.returnObject(connection);
            return false;
        }

        Debug.debug("Physical Connection Created for client from: " +
                socket.getIP());

        // 把连接注册到I/O队列中,这样就可以监听请求
        if (!ioQueue.addSocketContext(socket, connection)) {
            thePhysicalConnectionPool.returnObject(connection);
            //leave socket close to acceptor
            return false;
        }

        Debug.debug("Queue adds client from: " + socket.getIP());

        // 把创建的PhysicalConnection加入pending队列中,此时连接
        // 还不算是真正的已连接状态,要等到第一个请求到达并正确
        // 处理之后才会是已连接状态,并且会创建一个LogicalConnection
        // 和这个PhysicalConnection相关联
        addPhysicalConnection(connection);

        // 初始化PhysicalConnection
        serverImpl.getDispatcher().handleInitialize(connection);

        return true;
    }

ClientFactory是PhysicalConnection的管理程序,这个方法的的作用就是创建PhysicalConnection和新的socket相关联,并且把PhysicalConnection加入请求监听的I/O队列。因此来说说IOQueue。

IOQueue本身是一个接口:

public interface IOQueue<T> {

    public boolean create();

    public void free();

    // 从队列中获取事件,默认实现是带有阻塞超时的,即当没有事件
    // 时会阻塞一段时间,超时就会返回null
    public IOEvent<T> getQueuedEvent(boolean isInputEvents);

    // 注册连接,context是关联对象,类似于附件
    public boolean addSocketContext(PushClientSocket socket, T context);

    // 取消注册
    public void deleteSocketContext(PushClientSocket socket);

    // IOQueue的事件监听是一次性,这是为了防止事件在没有被处理之前,这个事件
    // 再次被捕捉到(Java的read/write事件都是这样),因此这个方法会在事件
    // 被处理后调用,再次注册。
    public boolean rearmSocketForWrite(PushClientSocket socket, T context);

    // Read事件代表的就是从客户端有数据过来
    public boolean rearmSocketForRead(PushClientSocket socket, T context);
}

IOQueueImpl是IOQueue的Java NIO版本的实现。IOQueueImpl会内置一个独立线程以及一个Selector,这里关于注册有一点需要说明:

PushClientSocketImpl的registerSelector方法用于注册socket,这里需要调用wakeup方法。因为如果独立线程会调用Selector的select方法等待新的数据,这个时候直接

调用register方法会被阻塞,因此需要先调用wakeup唤醒selector。

    public SelectionKey registerSelector(Selector selector, int ops,
            Object attachment) throws IOException {
        selector.wakeup(); // To prevent block when calling register method
        return channel.register(selector, ops, attachment);
    }

接着说说独立线程监听事件,因为OP_WRITE的特殊性,这里只监听OP_READ事件。

    private void pollEvents(boolean isOutPoll) {
        Selector selector;
        BlockingQueue<SelectionKey> queue;
        if (isOutPoll) {
            return;
        } else {
            selector = this.inPollSelector;
            queue = this.inPollQueue;
        }

        List<SelectionKey> cache = new LinkedList<SelectionKey>();

        while (isPolling) {
            try {
                selector.select();
            } catch (IOException e) {
                continue;
            }

            // 这里调用yield释放控制权是为了刚刚提到的register方法能被顺利执行
            Thread.yield();

            // Add into cache (Add into the blocking queue directly
            // may block so that the selector cannot release the selection
            // key in time)
            if (selector.isOpen()) {
                for (SelectionKey key : selector.selectedKeys()) {
                    // 前面提到监听事件是一次性的,因此这里取消监听
                    // 后面再调用rearm方法重新注册
                    key.cancel();
                    cache.add(key);
                }

                // Clear the keys
                selector.selectedKeys().clear();

                // 因为使用了限定长度的BlockingQueue,可能因为队列已满导致阻塞
                // 因此先把事件转移到缓存中,释放Selector
                queue.addAll(cache);
                cache.clear();
            } else {
                break; // selector closed
            }
        }
    }

顺便说说Demultiplexor获取事件调用的getQueuedEvent方法,这里使用BlockingQueue来实现阻塞等待:

    public IOEvent<PhysicalConnection> getQueuedEvent(boolean isInputEvents) {
        final IOEventType type;
        final BlockingQueue<SelectionKey> pollQueue;

        if (isInputEvents) {
            type = IOEventType.read;
            pollQueue = inPollQueue;
        } else {
            type = null;
            pollQueue = null;
        }

        if (pollQueue == null) {
            return null;
        }

        try {
            // 设置1秒的超时,这样后面关闭时清空I/O队列的时候不会导致
            // Demultiplexor一直被阻塞
            SelectionKey key = pollQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (key != null) {
                if (key.attachment() instanceof PhysicalConnection) {
                    return new IOEvent<PhysicalConnection>(type,
                            (PhysicalConnection)(key.attachment()));
                }
            }
        } catch (InterruptedException e) {
            // Ignore
        }

        return null;
    }

关于新socket的处理就说这么多吧。

时间: 2024-10-07 12:15:41

《用Java写一个通用的服务器程序》03 处理新socket的相关文章

《用Java写一个通用的服务器程序》01 综述

最近一两年用C++写了好几个基于TCP通信类型程序,都是写一个小型的服务器,监听请求,解析自定义的协议,处理请求,返回结果.每次写新程序时都把老代码拿来,修改一下协议解析部分和业务处理部分,然后就一个新的程序就诞生了.如此这般做了几回,就萌生了一个想法:是不是可以做一个通用的服务器程序,每次只要实现很少的代码就可以构建的一个新的服务器程序? 巧的是在用C++写代码的时候,我刚好碰到过一个叫做Push Framework的开源项目(在这里可以找到:www.pushframework.com),就是

《用Java写一个通用的服务器程序》02 监听器

在一个服务器程序中,监听器的作用类似于公司前台,起引导作用,因此监听器花在每个新连接上的时间应该尽可能短,这样才能保证最快响应. 回到编程本身来说: 1. 监听器最好由单独的线程运行 2. 监听器在接到新的连接之后,处理连接的方法需要尽快返回 在Java Push Framework中,因为需要同时监听普通客户端和服务器监视服务的客户端,所以定义两种监听器:Acceptor和MonitorAcceptor. 由于两者的关于监听部分的逻辑是相同的,因此首先定义了抽象类Listener来实现了监视器

五:用JAVA写一个阿里云VPC Open API调用程序

用JAVA写一个阿里云VPC Open API调用程序 摘要:用JAVA拼出来Open API的URL 引言 VPC提供了丰富的API接口,让网络工程是可以通过API调用的方式管理网络资源.用程序和软件管理自动化管理网络资源是一件显著提升运维效率和网络生产力的事情.产品经理教你写代码系列文章的目标是不懂代码的网络工程师能一步一步的学会用API管理网络. 另外通过文章标题大家也可以看出来,产品经理教你写代码肯定是一个业余班,里面的代码很多写的都不规范,可能也有很多Bug.专业选手可以参考的有限,请

用JAVA写一个视频播放器

前言 跳过废话,直接看正文 当年入坑java是因为它的跨平台优势.那时我认为,"编写一次,处处运行."这听上去多么牛逼,应该是所有语言发展的终极之道,java势必会一统天下. 然而事实证明,那时的我还是太年轻. 正所谓鱼和熊掌不可兼得,若要享受跨平台带来的方便,便不可避免地要接受性能上的不足.事实上,java一直在致力于提高虚拟机的性能(JIT等技术),但面对对实时计算性能要求很高或涉及到用硬件优化的任务(视频的硬件编码.解码)时,仍远远比不上c或c++.因此,很少能够看到有人用jav

用java写一个远程视频监控系统,实时监控(类似直播)我想用RPT协议,不知道怎么把RPT协议集成到项目中

我最近在用java写一个远程视频监控系统,实时监控(类似直播)我想用RPT协议,不知道怎么把RPT协议集成到项目中,第一次写项目,写过这类项目的多多提意见,哪方面的意见都行,有代码或者demo的求赏给我,谢谢

DuiVision开发教程(2)-如何写一个简单的界面程序

基于DuiVision界面库开发的界面程序主要包括如下几部分内容: 1.资源定义,包括图片资源.各个窗口界面的xml定义文件 2.事件处理类代码,用于处理界面响应消息 3.其他业务逻辑代码 下面举例说明如何写一个简单的界面程序. 第一步:使用VC向导创建一个有两个tab页面的DuiVision工程 向导生成的解决方案文件如下: 默认有两个工程,分别是DuiVision库和应用程序工程.自动生成的代码目录中bin目录下的内容那个如下,bkimg目录存放窗口背景图片,skins目录存放图片资源,xm

用JAVA写一个函数,功能如下: 任意给定一组数, 找出任意数相加之后的结果为35(任意设定)的情况

用JAVA写一个函数.功能如下:任意给定一组数,例如{12,60,-8,99,15,35,17,18},找出任意数相加之后的结果为35(任意设定)的情况. 可以递归算法来解: package test1; import java.util.Arrays; public class demo { public static void main(String[] args) { String str = "12,60,-8,99,15,35,17,18,8,10,11,12"; int s

感觉Java写一个窗口真心简单,很易上手

上学期学习了Java ,感觉Java写一个窗口真心简单,很易上手,也就难怪很多开发人员选择Java作为自己的开发编程语言.但是由于自身对windows的热爱,让我觉得c.c++语言才是我亲睐的编程语言,虽然难度大些,但是这才能体现能力所在.其实之前一直想自学一下win32,但是由于时间的显示和种种原因而耽搁了,于是今年暑假决心深入学习win32. 在学习过程中呢,我会在此留下自己的学习心得,当做自己的笔记.初学者可以借鉴,高手可以多多指教,呵呵…… 好了,今天开始做第一课的笔记吧: 学习Win3

一个误解: 单个服务器程序可承受最大连接数“理论”上是“65535”

from:http://www.cnblogs.com/tianzhiliang/archive/2011/06/13/2079564.html 一个误解: 单个服务器程序可承受最大连接数“理论”上是“65535” 2011-06-13 11:47 by 田志良, 5321 阅读, 8 评论, 收藏, 编辑 请注意,这里有两个词分别被我标记上了引号,一个是“理论”,一个是“65535”.强调“理论”这个词,是想特别明确误解者的意思:就是说,这个值是不可能被打破的,是铁板钉丁的.而65535这个数