Mina源码研究

目录

1. NioSocketAcceptor初始化源码研究

1.1 类图

1.2 方法调用时序图

1.3 初始化NioSocketAcceptor

1.4 SimpleIoProcessorPool初始化分析

1.5 NioProcessor的源码

1.6 总结

2. NioSocketAcceptor bind方法源码研究

2.1 创建ServerSocket监听

2.1.1 时序图

2.1.2 bind方法

2.1.3 startupAcceptor方法

2.1.4 创建ServerSocket监听

2.2 accpet客户端连接请求

2.3 读写监听及处理

Mina版本为2.09

1. NioSocketAcceptor初始化源码研究

初始化服务端acceptor的代码如下:

IoAcceptor acceptor = new NioSocketAcceptor();

那么它到底做了些什么呢,我们一起来看看源代码

先贴出类图和类调用时序图,给大家看个大概:

1.1 类图

1.2 方法调用时序图

1.3 初始化NioSocketAcceptor

调用构造方法代码如下:

NioSocketAcceptor类

public NioSocketAcceptor() {

    super(new DefaultSocketSessionConfig(), NioProcessor.class);

    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);

}

注意参数NioProcessor.class,这将是后面processor池中对象的具体类型

继续调用父类AbstractPollingIoAcceptor的构造方法

代码编号1.2

protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
}

注意传参new SimpleIoProcessorPool<S>(processorClass),processorClass实际是NioProcessor.class

然后继续

private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
            boolean createdProcessor, SelectorProvider selectorProvider) {
        super(sessionConfig, executor);

        if (processor == null) {
            throw new IllegalArgumentException("processor");
        }

        //代码编号1.3:赋值给processor
        this.processor = processor;
        this.createdProcessor = createdProcessor;

        try {
            // Initialize the selector
            //代码编号1.4:初始化Selector
            init(selectorProvider);

            // The selector is now ready, we can switch the
            // flag to true so that incoming connection can be accepted
            selectable = true;
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeIoException("Failed to initialize.", e);
        } finally {
            if (!selectable) {
                try {
                    destroy();
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                }
            }
        }
    }

注释:

1. 代码编码1.3:赋值给AbstractPollingIoAcceptor的processor,其实际类型为SimpleIoProcessorPool

那么接下来继续看代码编号1.4

@Override
    protected void init(SelectorProvider selectorProvider) throws Exception {
        this.selectorProvider = selectorProvider;

        if (selectorProvider == null) {
            selector = Selector.open();
        } else {
            selector = selectorProvider.openSelector();
        }
    }

这里初始化了selector,该selector用于注册客户端连接的事件

那么我们再画一个类图,分别看看processor和selector的位置:

1.4 SimpleIoProcessorPool初始化分析

从代码编号1.2红色部分点击进入

public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
        this(processorType, null, DEFAULT_SIZE, null);
}

继续进入

public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, SelectorProvider selectorProvider) {
        if (processorType == null) {
            throw new IllegalArgumentException("processorType");
        }

        if (size <= 0) {
            throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
        }

        // Create the executor if none is provided
        createdExecutor = (executor == null);

        if (createdExecutor) {
            this.executor = Executors.newCachedThreadPool();
            // Set a default reject handler
            ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        } else {
            this.executor = executor;
        }

        //代码编号1.2.0:IoProcessor池,注意这里的size为电脑cpu核数+1,后面为线程的最大数
        pool = new IoProcessor[size];

        boolean success = false;
        Constructor<? extends IoProcessor<S>> processorConstructor = null;
        boolean usesExecutorArg = true;

        try {
            // We create at least one processor
                // 代码编号1.2.1:初始化至少一个processor,这里的processorType为NioProcessor
            try {
                try {
                 processorConstructor = processorType.getConstructor(ExecutorService.class);
                    pool[0] = processorConstructor.newInstance(this.executor);
                } catch (NoSuchMethodException e1) {
                    // To the next step...
                    try {
                        if(selectorProvider==null) {
                           processorConstructor = processorType.getConstructor(Executor.class);
                            //代码编号1.2.2 Executor为线程池
                            pool[0] = processorConstructor.newInstance(this.executor);
                        } else {
                            processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
                            pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
                        }
                    } catch (NoSuchMethodException e2) {
                        // To the next step...
                        try {
                            processorConstructor = processorType.getConstructor();
                            usesExecutorArg = false;
                            pool[0] = processorConstructor.newInstance();
                        } catch (NoSuchMethodException e3) {
                            // To the next step...
                        }
                    }
                }
            } catch (RuntimeException re) {
                LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
                throw re;
            } catch (Exception e) {
                String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
                LOGGER.error(msg, e);
                throw new RuntimeIoException(msg, e);
            }

            if (processorConstructor == null) {
                // Raise an exception if no proper constructor is found.
                String msg = String.valueOf(processorType) + " must have a public constructor with one "
                        + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
                        + Executor.class.getSimpleName() + " parameter or a public default constructor.";
                LOGGER.error(msg);
                throw new IllegalArgumentException(msg);
            }

            // Constructor found now use it for all subsequent instantiations
            for (int i = 1; i < pool.length; i++) {
                try {
                    if (usesExecutorArg) {
                        if(selectorProvider==null) {
                            pool[i] = processorConstructor.newInstance(this.executor);
                        } else {
                            pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
                        }
                    } else {
                        pool[i] = processorConstructor.newInstance();
                    }
                } catch (Exception e) {
                    // Won‘t happen because it has been done previously
                }
            }

            success = true;
        } finally {
            if (!success) {
                dispose();
            }
        }
    }

注意看代码编号1.2.0-1.2.2

1.5 NioProcessor的源码

public NioProcessor(Executor executor) {
        super(executor);

        try {
            // Open a new selector
            

selector =

 Selector.open();
        } catch (IOException e) {
            throw new RuntimeIoException("Failed to open a selector.", e);
        }
    }

NioProcessor也维护一个selector,用户监听读写事件

1.6 总结

经过分析,NioSocketAcceptor初始化做了如下事情:

1) 建立processor池SimpleIoProcessorPool,初始化池中的对象NioProcessor

2) 初始化NioSocketAcceptor的Selector,监听客户端连接事件

3) 初始化NioProcessor中的Selector,监听读写事件

2. NioSocketAcceptor bind方法源码研究

2.1 创建ServerSocket监听

2.1.1 时序图

2.1.2 bind方法

API使用示例:

public final void bind(SocketAddress localAddress) throws IOException {
   if (localAddress == null) {
       throw new IllegalArgumentException("localAddress");
   }

   List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
   localAddresses.add(localAddress);
   //代码编号2.1.2.1
   bind(localAddresses);
}

进入代码编号2.1.2.1: AbstractIoAcceptor类

public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
        if (isDisposing()) {
            throw new IllegalStateException("Already disposed.");
        }

        if (localAddresses == null) {
            throw new IllegalArgumentException("localAddresses");
        }

        List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();

        for (SocketAddress a : localAddresses) {
            checkAddressType(a);
            localAddressesCopy.add(a);
        }

        if (localAddressesCopy.isEmpty()) {
            throw new IllegalArgumentException("localAddresses is empty.");
        }

        boolean activate = false;
        synchronized (bindLock) {
            synchronized (boundAddresses) {
                if (boundAddresses.isEmpty()) {
                    activate = true;
                }
            }

            if (getHandler() == null) {
                throw new IllegalStateException("handler is not set.");
            }

            try {
                //代码编号2.1.2.2
                Set<SocketAddress> addresses = bindInternal(localAddressesCopy);

                synchronized (boundAddresses) {
                    boundAddresses.addAll(addresses);
                }
            } catch (IOException e) {
                throw e;
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e);
            }
        }

        if (activate) {
            getListeners().fireServiceActivated();
        }
    }

进入代码编号2.1.2.2,AbstractPollingIoAcceptor类

protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
        // Create a bind request as a Future operation. When the selector
        // have handled the registration, it will signal this future.
        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

        // adds the Registration request to the queue for the Workers
        // to handle
        registerQueue.add(request);

        // creates the Acceptor instance and has the local
        // executor kick it off.
        //代码编号2.1.2.3
        startupAcceptor();

        // As we just started the acceptor, we have to unblock the select()
        // in order to process the bind request we just have added to the
        // registerQueue.
        try {
            lock.acquire();

            // Wait a bit to give a chance to the Acceptor thread to do the select()
            Thread.sleep(10);
            wakeup();
        } finally {
            lock.release();
        }

        // Now, we wait until this request is completed.
        request.awaitUninterruptibly();

        if (request.getException() != null) {
            throw request.getException();
        }

        // Update the local addresses.
        // setLocalAddresses() shouldn‘t be called from the worker thread
        // because of deadlock.
        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

        for (H handle : boundHandles.values()) {
            newLocalAddresses.add(localAddress(handle));
        }

        return newLocalAddresses;
    }

 

2.1.3 startupAcceptor方法

进入代码编号2.1.2.3 (AbstractPollingIoAcceptor)

private void startupAcceptor() throws InterruptedException {
        // If the acceptor is not ready, clear the queues
        // TODO : they should already be clean : do we have to do that ?
        if (!selectable) {
            registerQueue.clear();
            cancelQueue.clear();
        }

        // start the acceptor if not already started
        //当前线程中获取,为同步
        Acceptor acceptor = acceptorRef.get();

        if (acceptor == null) {
            lock.acquire();
            //代码编号2.1.3.1:建立一个acceptor,即服务端启动服务,acceptor为runnable
            acceptor = new Acceptor();

            if (acceptorRef.compareAndSet(null, acceptor)) {
                //代码编号2.1.3.2:开启线程
                executeWorker(acceptor);
            } else {
                lock.release();
            }
        }
    }

进入代码编号2.1.3.1 (AbstractPollingIoAcceptor.Acceptor)

private class Acceptor implements Runnable {
        public void run() {
            assert (acceptorRef.get() == this);

            int nHandles = 0;

            // Release the lock
            lock.release();

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    //代码编号2.1.3.2:监听
                    int selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    //代码编号2.1.3.3:将OP_ACCEPT事件注册到Selector
                    nHandles += registerHandles();

                    // Now, if the number of registred handles is 0, we can
                    // quit the loop: we don‘t have any socket listening
                    // for incoming connection.
                    if (nHandles == 0) {
                        acceptorRef.set(null);

                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                            assert (acceptorRef.get() != this);
                            break;
                        }

                        if (!acceptorRef.compareAndSet(null, this)) {
                            assert (acceptorRef.get() != this);
                            break;
                        }

                        assert (acceptorRef.get() == this);
                    }

                    if (selected > 0) {
                        // We have some connection request, let‘s process
                        // them here.
                        //代码编号2.1.3.4:处理OP_ACCEPT事件,即处理客户端连接请求
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            // Cleanup all the processors, and shutdown the acceptor.
            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }
2.1.4 创建ServerSocket监听

进入代码代码编号2.1.3.3AbstractPollingIoAcceptor类),将OP_ACCEPT事件注册到Selector

private int registerHandles() {
        for (;;) {
            // The register queue contains the list of services to manage
            // in this acceptor.
            AcceptorOperationFuture future = registerQueue.poll();

            if (future == null) {
                return 0;
            }

            // We create a temporary map to store the bound handles,
            // as we may have to remove them all if there is an exception
            // during the sockets opening.
            Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
            List<SocketAddress> localAddresses = future.getLocalAddresses();

            try {
                // Process all the addresses
                for (SocketAddress a : localAddresses) {
                    //代码编号2.1.4.1:开启通道
                    H handle = open(a);
                    newHandles.put(localAddress(handle), handle);
                }

                // Everything went ok, we can now update the map storing
                // all the bound sockets.
                boundHandles.putAll(newHandles);

                // and notify.
                future.setDone();
                return newHandles.size();
            } catch (Exception e) {
                // We store the exception in the future
                future.setException(e);
            } finally {
                // Roll back if failed to bind all addresses.
                if (future.getException() != null) {
                    for (H handle : newHandles.values()) {
                        try {
                            close(handle);
                        } catch (Exception e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        }
                    }

                    // TODO : add some comment : what is the wakeup() waking up ?
                    wakeup();
                }
            }
        }
    }

进入代码代码编号2.1.4.1AbstractPollingIoAcceptor类)

protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        //创建ServerSocket监听
        ServerSocketChannel channel = null;

        if (selectorProvider != null) {
            channel = selectorProvider.openServerSocketChannel();
        } else {
            //开启通道
            channel = ServerSocketChannel.open();
        }

        boolean success = false;

        try {
            // This is a non blocking socket channel
            // 设置为非阻塞
            channel.configureBlocking(false);

            // Configure the server socket,
            ServerSocket socket = channel.socket();

            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());

            // and bind.
            try {
                socket.bind(localAddress, getBacklog());
            } catch (IOException ioe) {
                // Add some info regarding the address we try to bind to the
                // message
                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                        + ioe.getMessage();
                Exception e = new IOException(newMessage);
                e.initCause(ioe.getCause());

                // And close the channel
                channel.close();

                throw e;
            }

            // Register the channel within the selector for ACCEPT event
            //注册OP_ACCEPT监听事件
            channel.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(channel);
            }
        }
        return channel;
    }

2.2 accpet客户端连接请求

时序图:

进入代码编号2.1.3.4AbstractPollingIoAcceptor. Acceptor类)

private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                //代码2.4.0:accept客户端连接,创建Session
                S session = accept(processor, handle);

                if (session == null) {
                    continue;
                }
                //代码2.4.1:初始化Session
                initSession(session, null, null);

                // add the session to the SocketIoProcessor
                //代码2.4.2:
                session.getProcessor().add(session);
            }
        }

进入代码2.4.0NioSocketAcceptor类)

protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = null;

        if (handle != null) {
            key = handle.keyFor(selector);
        }

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        //代码2.4.3 accept客户端连接
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }
        //代码2.4.4: 创建Session并返回
        return new NioSocketSession(this, processor, ch);
    }

回到processHandles方法,进入代码2.4.2 SimpleIoProcessor.add(S session);

public final void add(S session) {
        //代码2.4.5: getProcessor 返回NioProcessor,并将session加入NioProcessor
        getProcessor(session).add(session);
    }

代码2.4.5进入getProcessor方法(SimpleIoProcessor)

private IoProcessor<S> getProcessor(S session) {
        IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);

        if (processor == null) {
            if (disposed || disposing) {
                throw new IllegalStateException("A disposed processor cannot be accessed.");
            }

            processor = pool[Math.abs((int) session.getId()) % pool.length];

            if (processor == null) {
                throw new IllegalStateException("A disposed processor cannot be accessed.");
            }

            session.setAttributeIfAbsent(PROCESSOR, processor);
        }

        return processor;
    }

该段代码的作用

根据session从processor池中获取一个processor

不同的session可能对应同一个processor

进入代码2.4.5: AbstractPollingIoProcessor.add(S session)

public final void add(S session) {
        if (disposed || disposing) {
            throw new IllegalStateException("Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        //将session加入newSession队列
        newSessions.add(session);
        //代码2.4.6: 开启processor线程
        startupProcessor();
    }

我们现在再看一下NioProcessor的类结构图

开启processor线程

进入代码2.4.6AbstractPollingIoProcessor类

private void startupProcessor() {
           //代码2.4.7:线程安全处理
        Processor processor = processorRef.get();

        if (processor == null) {
              //代码2.4.8:如果该对象没有绑定Processor,则新建一个
            processor = new Processor();
            //线程安全处理
            if (processorRef.compareAndSet(null, processor)) {
                 //代码2.4.9:开启Processor线程
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }

        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
        wakeup();
    }

2.3 读写监听及处理

时序图:

进入代码2.4.8AbstractPollingIoProcessor.Processor

private class Processor implements Runnable {
        public void run() {
            assert (processorRef.get() == this);

            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    // This select has a timeout so that we can manage
                    // idle session when we get out of the select every
                    // second. (note : this is a hack to avoid creating
                    // a dedicated thread).
                    long t0 = System.currentTimeMillis();
                    //代码2.3.0:监听读写事件
                       int selected = select(SELECT_TIMEOUT);
                    long t1 = System.currentTimeMillis();
                    long delta = (t1 - t0);

                 //代码2.3.1:此段代码不清楚
                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                        // Last chance : the select() may have been
                        // interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");

                            // we can reselect immediately
                            // set back the flag to false
                            wakeupCalled.getAndSet(false);

                            continue;
                        } else {
                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                            // Ok, we are hit by the nasty epoll
                            // spinning.
                            // Basically, there is a race condition
                            // which causes a closing file descriptor not to be
                            // considered as available as a selected channel, but
                            // it stopped the select. The next time we will
                            // call select(), it will exit immediately for the same
                            // reason, and do so forever, consuming 100%
                            // CPU.
                            // We have to destroy the selector, and
                            // register all the socket on a new one.
                            registerNewSelector();
                        }

                        // Set back the flag to false
                        wakeupCalled.getAndSet(false);

                        // and continue the loop
                        continue;
                    }

                    // Manage newly created session first
                    nSessions += handleNewSessions();

                    updateTrafficMask();

                    // Now, if we have had some incoming or outgoing events,
                    // deal with them
                    if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        //代码2.3.2:处理读写
                        process();
                    }

                    // Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);

                    // And manage removed sessions
                    nSessions -= removeSessions();

                    // Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);

                    // Get a chance to exit the infinite loop if there are no
                    // more sessions on this Processor
                    if (nSessions == 0) {
                        processorRef.set(null);

                        if (newSessions.isEmpty() && isSelectorEmpty()) {
                            // newSessions.add() precedes startupProcessor
                            assert (processorRef.get() != this);
                            break;
                        }

                        assert (processorRef.get() != this);

                        if (!processorRef.compareAndSet(null, this)) {
                            // startupProcessor won race, so must exit processor
                            assert (processorRef.get() != this);
                            break;
                        }

                        assert (processorRef.get() == this);
                    }

                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<S> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }

                        wakeup();
                    }
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    // But first, dump a stack trace
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            try {
                synchronized (disposalLock) {
                    if (disposing) {
                        doDispose();
                    }
                }
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            } finally {
                disposalFuture.setValue(true);
            }
        }
    }

进入代码2.3.2AbstractPollingIoProcessor类)

private void process() throws Exception {
        for (Iterator<S> i = selectedSessions(); i.hasNext();) {
            S session = i.next();
            //代码2.3.3:
            process(session);
            i.remove();
        }
}

进入代码2.3.3AbstractPollingIoProcessor类)

该段代码处理读写

private void process(S session) {
        // Process Reads
        //代码2.3.4:处理读
        if (isReadable(session) && !session.isReadSuspended()) {
            read(session);
        }

        // Process writes
       //代码2.3.5:处理写
        if (isWritable(session) && !session.isWriteSuspended()) {
            // add the session to the queue, if it‘s not already there
            if (session.setScheduledForFlush(true)) {
                flushingSessions.add(session);
            }
        }
    }

2.4 读Process

时间: 2024-10-10 07:26:12

Mina源码研究的相关文章

Chrome自带恐龙小游戏的源码研究(完)

在上一篇<Chrome自带恐龙小游戏的源码研究(七)>中研究了恐龙与障碍物的碰撞检测,这一篇主要研究组成游戏的其它要素. 游戏分数记录 如图所示,分数及最高分记录显示在游戏界面的右上角,每达到100分就会出现闪烁特效,游戏第一次gameover时显示历史最高分.分数记录器由DistanceMeter构造函数实现,以下是它的全部代码: 1 DistanceMeter.dimensions = { 2 WIDTH: 10, //每个字符的宽度 3 HEIGHT: 13, //每个字符的高 4 DE

Chrome自带恐龙小游戏的源码研究(七)

在上一篇<Chrome自带恐龙小游戏的源码研究(六)>中研究了恐龙的跳跃过程,这一篇研究恐龙与障碍物之间的碰撞检测. 碰撞盒子 游戏中采用的是矩形(非旋转矩形)碰撞.这类碰撞优点是计算比较简单,缺点是对不规则物体的检测不够精确.如果不做更为精细的处理,结果会像下图: 如图所示,两个盒子虽然有重叠部分,但实际情况是恐龙和仙人掌之间并未发生碰撞.为了解决这个问题,需要建立多个碰撞盒子: 不过这样还是有问题,观察图片,恐龙和仙人掌都有四个碰撞盒子,如果每次Game Loop里都对这些盒子进行碰撞检测

Redis源码研究—哈希表

Redis源码研究-哈希表 Category: NoSQL数据库 View: 10,980 Author: Dong 作者:Dong | 新浪微博:西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址:http://dongxicheng.org/nosql/redis-code-hashtable/ 本博客的文章集合:http://dongxicheng.org/recommend/ 本博客微信公共账号:hadoop123(微信号为:hadoop-123),分享

NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信

1.NIO超级陷阱 之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪.当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题. JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现.看如下代码: [java] view plain copy print? public class NIOTest { @Test p

Chrome自带恐龙小游戏的源码研究(五)

在上一篇<Chrome自带恐龙小游戏的源码研究(四)>中实现了障碍物的绘制及移动,从这一篇开始主要研究恐龙的绘制及一系列键盘动作的实现. 会眨眼睛的恐龙 在游戏开始前的待机界面,如果仔细观察会发现恐龙会时不时地眨眼睛.这是通过交替绘制这两个图像实现的: 可以通过一张图片来了解这个过程: 为实现图片的切换,需要一个计时器timer,并且需要知道两张图片切换的时间间隔msPerFrame.当计时器timer的时间大于切换的时间间隔msPerFrame时,将图片切换到下一张,到达最后一张时又从第一张

NIO byteBUffer 讲解 及Mina 源码分析

1.传统的socket: 阻塞式通信模式 tcp连接: 与服务器连接时 .必须等到连接成功后 才返回 . udp连接: 客户端发送数据 ,必须等到发送成功后返回 . 每建立一个 Scoket连接时, 同事创建一个新线程对该 Socket进行单独通信(采用阻塞式通信 ) 这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果 对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况 2.1NIO 设计背后的基石:反应器模式,用于事

live555源码研究(四)------UserAuthenticationDatabase类

一.UserAuthenticationDatabase类作用 1,用户/密码管理 2,鉴权管理 二.类UserAuthenticationDatabase继承关系图                         live555源码研究(四)------UserAuthenticationDatabase类,布布扣,bubuko.com

Chrome自带恐龙小游戏的源码研究(六)

在上一篇<Chrome自带恐龙小游戏的源码研究(五)>中实现了眨眼睛的恐龙,这一篇主要研究恐龙的跳跃. 恐龙的跳跃 游戏通过敲击键盘的Spacebar或者Up来实现恐龙的跳跃.先用一张图来表示整个跳跃的过程: 首先规定向下为正方向,即重力加速度(g)为正,起跳的速度(v)为负,恐龙距离画布上方的距离为yPos: 每一帧动画中,速度都会与重力加速度相加得到新的速度,再用新的速度与yPos相加得到新的yPos,改变恐龙的位置为新的yPos,表现出来为yPos不断减小: 当恐龙升至最高点,此时速度为

LIVE555源码研究之四:MediaServer (一)

从本篇文章开始我们将从简单服务器程序作为突破点,深入研究LIVE555源码. 从前面的文章我们知道,任何一个基于LIVE555库实现的程序都需要实现自己的环境类和调度类.这里,服务器程序就使用了BasicEnvironment库中实现的简单环境类和简单调度类.说它简单,是因为该环境类仅仅实现了将错误信息输出到控制台.而调度类仅仅通过select模型实现socket的读写. 下面我们来看下简单环境类BasicEnvironment和简单调度类BasicTaskScheduler是如何实现的. 打开