tomcat服务器解析(二) --- Endpoint

Endpoint主要用来提供基础的网络I/O服务,封装了网络通讯相关的细节。在AbstractProtocol中对Endpoint有这样一段注释

/**

* Endpoint that provides low - level
network I/O - must be matched to the

* ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO

* Endpoint etc.).

*/

protected AbstractEndpoint<S>
endpoint = null ;

先对AbstractEndpoint(org.apache.tomcat.util.net.AbstractEndpoint)类做了解。

【AbstractEndpoint的线程池】

AbstractEndpoint有一个Executor的属性,是它所用的线程池。这个线程池可以是外界指定的,也可以是由AbstractEndpoint自己创建的。通过属性internalExecutor来标识使用的是外部的线程池,还是有Endpoint自己创建的线程池。

可以由外部调用显式指定endpoint使用的线程池

/**

* External Executor based thread pool.

*/

private Executor executor = null;

public void setExecutor(Executor
executor) {

this.executor =
executor;

this.internalExecutor =
(executor==null);

}

public Executor
getExecutor() { return executor ;
}

在当调用者没有显式指定所用线程池时,会创建一个自己所用的线程池,创建方法如下。

public void createExecutor ()
{

internalExecutor = true ;

TaskQueue taskqueue = new TaskQueue();

TaskThreadFactory tf = new TaskThreadFactory(getName()
+ "-exec-" , daemon ,
getThreadPriority());

executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS ,taskqueue,
tf);

taskqueue.setParent( (ThreadPoolExecutor) executor);

}

【AbstractEndpoint的Acceptor】

在AbstractEndpoint中定义了Acceptor类(实现了Runnable接口),同时定义了acceptors属,主要用于接收网络请求。

/**

* Threads used to accept new connections and pass them to worker threads.

*/

protected Acceptor []
acceptors;

启动acceptors时,并没有使用前面提到过的线程池,而是生成了新的守护线程(getDaemon方法,默认返回true),来运行。但,具体在acceptors中线程的执行体,则交由具体的子类负责实现(貌似template-method模式是各种框架的基础配置),通过重写抽象方法createAcceptor来完成。

protected final void startAcceptorThreads()
{

int count
= getAcceptorThreadCount();

acceptors = new Acceptor [count];

for (int i
= 0; i < count; i++) {

acceptors[i]
= createAcceptor();

String threadName = getName() + "-Acceptor-" +
i;

acceptors[i].setThreadName(threadName);

Thread t = new Thread(acceptors [i],
threadName);

t.setPriority(getAcceptorThreadPriority());

t.setDaemon(getDaemon());

t.start();

}

}

AbstractEndpoint框架主要定义了一些基本的属性,同时规定了生命周期的调用顺序。

Endpoint的初始化和启动,主要执行具体子类的所实现的startInternal方法来完成。

public final void init() throws Exception
{

if (bindOnInit )
{

bind();

bindState =
BindState.BOUND_ON_INIT;

}

}

public final void start() throws Exception
{

if (bindState ==
BindState.UNBOUND) {

bind();

bindState =
BindState.BOUND_ON_START;

}

startInternal();

}

在Http11NioProtocol的构造函数中指定的是使用NioEndpoint实例,因此这里通过分析AbstractEndpoint的子类NioEndpoint来做进一步的了解。

这里主要关注bind和startInternal两个函数

【NioEndpoint】

bind操作的主体部分的代码如下

@Override

public void bind() throws Exception
{

serverSock = ServerSocketChannel.open();

socketProperties.setProperties(serverSock.socket());

InetSocketAddress addr = (getAddress()!= null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));

serverSock.socket().bind(addr,getBacklog());

serverSock.configureBlocking( true); //mimic
APR behavior

serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

// 这里省却了一些与配置设置和ssl相关的代码

selectorPool.open();

}

从代码上来看,bind操作主要是做一些配置参数的计算,以及开始对socket的监听

startInternal操作的代码如下

/**

* Start the NIO endpoint, creating acceptor, poller threads.

*/

@Override

public void startInternal() throws Exception
{

if (!running)
{

running = true;

paused = false;

processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

socketProperties.getProcessorCache());

keyCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

socketProperties.getKeyCache());

eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

socketProperties.getEventCache());

nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

socketProperties.getBufferPool());

// Create worker collection

if (
getExecutor() == null ) {

createExecutor();

}

initializeConnectionLatch();

// Start poller threads

pollers = new Poller[getPollerThreadCount()];

for (int i=0;
i<pollers.length; i++) {

pollers[i] = new Poller();

Thread pollerThread = new Thread(pollers[i],
getName() + "-ClientPoller-" +i);

pollerThread.setPriority(threadPriority);

pollerThread.setDaemon( true);

pollerThread.start();

}

startAcceptorThreads();

}

}

在startInternal中,初始化线程池,创建和启动网络数据接收线程组,创建和启动poller线程组。

在startInternal中,会检查是否指定要使用外部的线程池,如果没有指定外部线程池,Endpoint就会创建一个内部的线程池。但这里面没有与线程的调度和使用相关的代码

【线程池的调用时机】

与Acceptor和Poller相关?

在NioEndpoint中对Acceptor的功能说明如下,acceptor主要监听网络连接并且进行任务分发的后台线程。

// ---------------------------------------------------
Acceptor Inner Class

/**

* The background thread that listens for incoming TCP/IP connections and

* hands them off to an appropriate processor.

*/

protected class Acceptor extends AbstractEndpoint.Acceptor
{

Acceptor负责接收网络请求,建立连接。连接建立之后,将这个socket连接交给Poller。由Poller来负责执行数据的读取和业务执行。

从代码上看,Acceptor的负责控制底层同时连接的socket数目,它的任务在把建立建立之后socket交给Poller之后就结束了。

运行的主体部分在重写的run方法中,将建立连接之后的socket交给Poller的工作在setSocketOptions方法中实现。

setSocketOptions方法是专门处理特定socket连接的方法,将一个SocketChannel对象包装成一个NioChannel之后,注册到Poller中。

/**

* Process the specified connection.

*/

protected boolean setSocketOptions(SocketChannel
socket) {

// Process the connection

try {

//disable blocking, APR style, we are
gonna be polling it

socket.configureBlocking( false);

Socket sock = socket.socket();

socketProperties.setProperties(sock);

NioChannel channel = nioChannels.pop();

// 这里省略了设置channel属性的一些语句

getPoller0().register(channel);

catch (Throwable
t) {

// 省略异常处理的相关代码

return false ;

}

return true ;

}

getPoller0是从在startInternal方法中初始化的pollsers数组中取一个poller。

然后通过Poller对象的register方法把这个channel注册到此Poller对象上。

pollers数组的大小是根据当前的运行环境计算出来的,无法通过配置修改。

【Poller是什么】

Poller是实现了Runnable接口的,在NioEndpoint的时候,会初始化pollers数组,同时启动pollers数组中的线程,让pollers开始工作。

每个Poller都有一个自己的Selector对象,在Poller的构造函数中,通过调用Selector.open方法生成,虽然看上去这很像是一个单例模式,但实际上没法返回的都是一个全新的对象(可能与jdk的底层实现有关,目前从Oracle提供的jdk的试验来看,两次调用返回的是不同的对象)。

在Poller重写的run方法中,会首先根据当前endpoint的状态来选择操作。

如果endpoint被暂停,让Poller线程进行休眠,直到暂停被解除

// Loop if endpoint is paused

while (paused
&& (!close) ) {

try {

Thread.sleep(100);

catch (InterruptedException
e) {

// Ignore

}

}

endpoint是暂停状态,但没有被关闭,暂停状态是有可能恢复的,所以让poller休眠等待即可

如果endpoint被关闭,那就处理完已有的数据,这个Poller打开的selector。结束poller线程的执行。通过break跳出run方法体的while(true)循环。

// Time to terminate?

if (close)
{

events();

timeout(0, false);

try {

selector.close();

catch (IOException
ioe) {

log.error(sm.getString(

"endpoint.nio.selectorCloseFail" ),
ioe);

}

break;

else {

hasEvents = events();

}

如果endpoint是正常工作状态,处理已有的数据。通过events方法来处理当前Poller中已有的事件(数据)。同时使用selector.select或者selectNow来获取这个Poller上

状态已经OK的渠道,并进行数据处理。

if (
!close ) {

if (wakeupCounter.getAndSet(-1)
> 0) {

//if we are here,
means we have other stuff to do

//do a non blocking
select

keyCount = selector.selectNow();

else {

keyCount = selector.select(selectorTimeout);

}

wakeupCounter.set(0);

}

if (close)
{

events();

timeout(0, false);

try {

selector.close();

catch (IOException
ioe) {

log.error(sm.getString(

"endpoint.nio.selectorCloseFail" ),
ioe);

}

break;

}

正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法

Iterator<SelectionKey> iterator =keyCount
> 0 ? selector.selectedKeys().iterator() : null;

// Walk through the collection
of ready keys and dispatch

// any active event.

while (iterator
!= null && iterator.hasNext()) {

SelectionKey sk = iterator.next();

KeyAttachment attachment = (KeyAttachment)sk.attachment();

// Attachment may be null
if another thread has called

// cancelledKey()

if (attachment
== null) {

iterator.remove();

else {

attachment.access();

iterator.remove();

processKey(sk, attachment);

}

}

processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。

                         if ( isWorkerAvailable()
) {

unreg(sk, attachment, sk.readyOps());

boolean closeSocket
false;

// Read goes before
write

if (sk.isReadable())
{

if (!processSocket(attachment,
SocketStatus.OPEN_READ, true )) {

closeSocket = true;

}

}

if (!closeSocket
&& sk.isWritable()) {

if (!processSocket(attachment,
SocketStatus.OPEN_WRITE, true )) {

closeSocket = true;

}

}

if (closeSocket)
{

cancelledKey(sk,SocketStatus.DISCONNECT);

}

else {

result = false;

}

根据socket的状态,来进行调用,于是这里又转到了NioEndpoint上。

这里绕回到NioEndpoint上的processSocket,才发现对前面提到过的线程池的使用,

attachment.setCometNotify(false); //will
get reset upon next reg

SocketProcessor sc = processorCache.pop();

if (
sc == null ) sc = new SocketProcessor(attachment,
status);

else sc.reset(attachment,
status);

Executor executor = getExecutor();

if (dispatch
&& executor != null) {

executor.execute(sc);

else {

sc.run();

}

这里通过getExecutor来获取可用的线程池。任务被封装成SocketProcessor对象,在成功获取线程池后,则通过线程池来进行socket数据数据的读写操作。在此就使用到了启动tomcat时所配置的线程池了。

在此对NioEndpoint的架构做个总结

Acceptor:

负责监听并接收socket连接建立。由Acceptor来控制与服务端建立连接的客户端socket数目。具体的数目为一个服务可配置项,可以在启动服务时指定

Poller:

负责处理已建立连接的socket,将channel封装后,提交至线程池(Executors)来处理。Poller线程的数目与运行时环境有关,通过计算得出,不可配置。

Executors:

处理socket请求的线程池。线程池中线程的数目可在启动服务时配置。

[Poller中Selector的注册]

在Poller中,对selector的使用上,只看到通过selector.select活selector.selectNow来获取对应的渠道。但在java的nio中,一个渠道必须要先在selector上注册后,才能被

selector获取到。那么,各个channel是何时再selector上注册的呢?

答案在PollerEvent上。

前面提到过Acceptor的主要工作是把建立好连接的socket注册到Poller上,通过register上实现。

Poller的register把建立好连接的socket封装成一个PollerEvent对象,然后放入这个Poller所维护的事件队列中。

Poller内部所维护的事件队列,定义如下   private final SynchronizedQueue<PollerEvent>
events = new SynchronizedQueue<>();

在Poller的run方法中,通过events方法,来处理已有的事件。

public boolean events()
{

boolean result
false;

PollerEvent pe = null;

while (
(pe = events.poll()) != null ) {

result = true;

try {

pe.run();

pe.reset();

if (running
&& !paused) {

eventCache.push(pe);

}

catch (
Throwable x ) {

log.error( "",x);

}

}

return result;

}

events是一个PollerEvent类型队列。events方法中有一个while循环,取出队列中的每一个PollerEvent对象,然后执行它。

PollerEvent实现了Runnable接口,在其run方法中,完成了channel对selector的注册

        if ( interestOps == OP_REGISTER
) {

try {

socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);

catch (Exception
x) {

log.error( "",
x);

}

else {

socket.getPoller()返回这个channel所注册的Poller对象。getSelector()返回这个Poller对象的selector。注册之后,当这个socket的channel有数据到达,便能通过selector.select活selector.selectNow被返回,放入到Executor中进行处理。

时间: 2024-12-05 07:47:48

tomcat服务器解析(二) --- Endpoint的相关文章

Tomcat架构解析(二)-----Connector、Tomcat启动过程以及Server的创建过程

Connector用于跟客户端建立连接,获取客户端的Socket,交由Container处理.需要解决的问题有监听.协议以及处理器映射等等. 一.Connector设计   Connector要实现的主要功能如下: 设计图如下: 1.ProtocolHandler Connector中的ProtocolHandler用于处理不同的通信协议,Tomcat主要支持HTTP.AJP协议,并且支持BIO.NIO.APR等I/O方式.ProtocolHandler中使用AbstractEndpoint启动

tomcat服务器解析(四) ---- 组成模块分解

前面部分梳理了tomcat服务器处理http请求的一个流程,这里进行内容总结,梳理下在tomcat服务器实现中的,各种功能模块. [Endpoint] Endpoint是基础的网络设施,通过Endpoint来实现网络连接和控制,它是服务器对外I/O操作的接入点.主要任务是管理对外的socket连接,同时将建立好的socket连接交到合适的工作线程中去. content:org.apache.tomcat.util.net.AbstractEndpoint,org.apache.tomcat.ut

tomcat服务器解析(六)-- Acceptor

Acceptor负责用来管理连接到tomcat服务器的数量,来看看Acceptor在tomcat服务器中的应用,是如何实现连接管理的,socket连接建立成功之后,是如何实现内容的读写的(读写是交由Poller机制去完成). 先准备一点java nio中实现socket连接所需的基础知识:SocketChannel和ServerSocketChannel SocketChannel和ServerSocketChannel的概念与基础的阻塞式的java 网络编程中的socket和serversoc

tomcat服务器解析

tomcat服务器软件官方下载地址: 1.首先说明web服务器软件的作用吧! web服务器软件就是把把本地的资源共享给外部访问 2.为什么选用tomcat服务器软件? 因为tomcat服务器是阿帕奇组织的产品,他免费开源很适合web入门学习使用,而且tomcat服务器软件完全用java编写. 我使用的是tomcat9.0.13版本,给大家介绍一下文件的情况吧 bin目录:存放tomcat的命令也就是一些timcat的软件比如(启动服务器,和关闭服务器...) conf: 存放tomcat的配置信

tomcat服务器解析(七)-- Processor&amp;Endpoint&amp;ProtocolHandler

请求到达Poller处理,最终是由Processor来进行处理,为了说明这中间过程所涉及的部分,先整理下在tomcat服务的各个组成部分:ProtocolHandler.Endpoint.Endpoint.Handler.Processor 它们之间的引用关系如下 [ProtocolHandler]     <---------   Connector org.apache.coyote.ProtocolHandler org.apache.coyote.AbstractProtocol org

tomcat服务器解析(三)---- Handler for Endpoint

前面的分析到,请求最终被封装成了一个SocketProcessor对象,放在Executors线程池中去执行.这些都还只是在tomcat内部的socket的处理层面上,那请求最终是如何被转到开发人员所写的servlet上的? NioEndpoint.SocketProcessor所做的工作在私有方法doRun中                 if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the re

tomcat服务器解析(一)

httpservlet自己并不能独立运行,需要依赖于一个web容器才能够运行.维基百科中对httpservlet和web容器的关系做了简要说明 A web container (also known as a servlet container) is essentially the component of a web server that interacts with the servlets. The web container is responsible for managing t

tomcat服务器解析(五)-- Poller

在前面的分析中介绍过,Acceptor的作用是控制与tomcat建立连接的数量,但Acceptor只负责建立连接.socket内容的读写是通过Poller来实现的. Poller使用java nio来实现连接的管理. 关于nio,主要需要明确三个概念:Channel.Selector和SelectionKey. 在这里的使用上,它们之间的关系可以简单这样理解,Channel必须注册到Selector上才能用于接收socket数据,在Selector上有数据到达的Channel可以用Selecti

tomcat原理解析(二):整体架构

一 整体结构 前面tomcat实现原理(一)里面描述了整个tomcat接受一个http请求的简单处理,这里面我们讲下整个tomcat的架构,以便对整体结构有宏观的了解.tomat里面由很多个容器结合在一起,主要有server,service,context,host,engine,wrapper,connector这7个容器来组装.当然了tomcat里面还有其它容器这里就不一一列举,因为我只看重点的.这7个容器存着父子关系,即可以通过当前容器找自己的父容器和自己的子容器.说到这我画了一个简单的结