剑指架构师系列-tomcat6通过伪异步实现connector

首先在StandardService中start接收请求的线程,如下:

 synchronized (connectors) {
            for (int i = 0; i < connectors.length; i++) {
                try {
                    ((Lifecycle) connectors[i]).start();
                } catch (Exception e) {
                    log.error(sm.getString("standardService.connector.startFailed",connectors[i]), e);
                }
            }
        }

然后进入Connector,在这个类中调用了org.apache.coyote.http11.Http11Protocol类

protocolHandler.start();

在Http11Protocol类中又调用了org.apache.tomcat.util.net.JIoEndpoint类

endpoint.start();

下面看一下JIoEndpoint类中的start源代码,如下:

public void start() throws Exception {
		// Initialize socket if not done before
		if (!initialized) {
			init();
		}
		if (!running) {
			running = true;
			paused = false;

			// Create worker collection
			if (executor == null) {
				workers = new WorkerStack(maxThreads); // maxThreads值为200,可同时处理200个请求
			}

			// Start acceptor threads
			for (int i = 0; i < acceptorThreadCount; i++) {  // acceptorThreadCount值为1,只有一个接收请求的线程
				Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
				acceptorThread.setPriority(threadPriority);
				acceptorThread.setDaemon(daemon);
				acceptorThread.start();
			}
		}
	}

WorkerStack类使用了定长的数组来方便存取worker,也就是真正处理请求的线程。重点看一下Acceptor这个Runnable类的实现。

/**
	 * Server socket acceptor thread.
	 */
	protected class Acceptor implements Runnable {

		/**
		 * The background thread that listens for incoming TCP/IP connections
		 * and hands them off to an appropriate processor.
		 */
		public void run() {

			// Loop until we receive a shutdown command
			while (running) {

				// Loop if endpoint is paused
				while (paused) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						// Ignore
					}
				}

				// Accept the next incoming connection from the server socket
				try {
					Socket socket = serverSocketFactory.acceptSocket(serverSocket);  // 阻塞接收socket请求
					serverSocketFactory.initSocket(socket);
					// Hand this socket off to an appropriate processor
					if (!processSocket(socket)) {   // 处理socket请求
						// Close socket right away
						try {
							socket.close();
						} catch (IOException e) {
							// Ignore
						}
					}
				} catch (IOException x) {
					if (running)
						log.error(sm.getString("endpoint.accept.fail"), x);
				} catch (Throwable t) {
					log.error(sm.getString("endpoint.accept.fail"), t);
				}
				// The processor will recycle itself when it finishes ???
			}
		}// end run

	}

最重要的就是processSocket()方法了,看下源代码:

protected boolean processSocket(Socket socket) {
		try {
			if (executor == null) {  // 默认情况
				getWorkerThread().assign(socket);
			} else {  // 用户自己指定了执行任务的线程池
				executor.execute(new SocketProcessor(socket));
			}
		} catch (Throwable t) {
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
			return false;
		}
		return true;
	}

在默认情况下,首先要get到一个Worker的Thread,然后才能assign任务。

看一下getWorkerThread()这条逻辑:

/**
	 * Return a new worker thread, and block while to worker is available.
	 */
	protected Worker getWorkerThread() {
		// Allocate a new worker thread
		synchronized (workers) {   // 获取workers锁
			Worker workerThread;
			while ((workerThread = createWorkerThread()) == null) {
				try {
					workers.wait();  // 没有可用线程时释放workers锁,等待notify
				} catch (InterruptedException e) {
					// Ignore
				}
			}
			return workerThread;
		}
	}

代码要通过createWorkerThread()方法来获取一个workerThread,阅读如下代码就可以知道,这个方法有可能返回null。这样这个线程就需要让锁等待了,直到有线程notify。想一下就知道,肯定是分配出去执行任务的线程执行完成后,就可以notify接口请求的线程。接收请求的线程继续while循环,直到获取到一个workerThread为止。

createWorkerThread()方法源代码:

protected Worker createWorkerThread() {

		synchronized (workers) {
			if (workers.size() > 0) { // 通过WorkerStack提供的方法来操作Worker
				curThreadsBusy++;
				return workers.pop();
			}
			if ((maxThreads > 0) && (curThreads < maxThreads)) { // 保证不能大于指定的最大线程数
				curThreadsBusy++;
				if (curThreadsBusy == maxThreads) {
					log.info(sm.getString("endpoint.info.maxThreads", Integer.toString(maxThreads), address,Integer.toString(port)));
				}
				return (newWorkerThread());
			} else {
				if (maxThreads < 0) {  // maxThreads小于0时会无限制的new WorkerThread,表示不限制
					curThreadsBusy++;
					return (newWorkerThread());
				} else {  // 当curThreads等于maxThreads或者大于maxThreads且maxThreads大于0的情况
					return (null);
				}
			}
		}

	}  

recycleWorkerThread()方法源代码:

protected void recycleWorkerThread(Worker workerThread) {
		synchronized (workers) {
			workers.push(workerThread);
			curThreadsBusy--;
			workers.notify();
		}
	}

这个方法被谁调用了呢?当然是被执行任何的线程调用了。  

下面来看一下最重要的Worker类中非常重要的几个方法,如下:

protected class Worker implements Runnable {

		protected Thread thread = null;
		protected boolean available = false;  // available初始化为false
		protected Socket socket = null;

		/**
		 * The background thread that listens for incoming TCP/IP connections
		 * and hands them off to an appropriate processor.
		 */
		public void run() {

			// Process requests until we receive a shutdown signal
			while (running) {

				// Wait for the next socket to be assigned
				Socket socket = await(); // 1
				if (socket == null)
					continue;

				// Process the request from this socket
				if (!setSocketOptions(socket) || !handler.process(socket)) {
					// Close socket
					try {
						socket.close();
					} catch (IOException e) {
					}
				}
				// Finish up this request
				socket = null;
				recycleWorkerThread(this);
			}

		}

		/**
		 * Start the background processing thread.
		 */
		public void start() {
			thread = new Thread(this);
			thread.setName(getName() + "-" + (++curThreads));
			thread.setDaemon(true);
			thread.start();
		}

	}

这个线程在assign任务之前是start的,看一下run()方法中的第1步调用了await()方法,在await()方法中由于available值默认为false,所以进入了while循环后让出了线程锁并等待assign()方法notifyAll()。

/**
		 * Await a newly assigned Socket from our Connector, or
		 * null if we are supposed to shut down.
		 */
		private synchronized Socket await() {

			// Wait for the Connector to provide a new Socket
			while (!available) {
				try {
					wait();
				} catch (InterruptedException e) {
				}
			}

			// Notify the Connector that we have received this Socket
			Socket socket = this.socket;
			available = false;
			notifyAll();

			return (socket);

		}

当我们assign任务后,调用的assign()方法如下:

/**
		 * Process an incoming TCP/IP connection on the specified socket. Any
		 * exception that occurs during processing must be logged and swallowed.
		 * NOTE: This method is called from our Connector‘s thread. We
		 * must assign it to our own thread so that multiple simultaneous
		 * requests can be handled.
		 */
		synchronized void assign(Socket socket) {

			// Wait for the Processor to get the previous Socket
			while (available) {
				try {
					wait();
				} catch (InterruptedException e) {
				}
			}

			// Store the newly available Socket and notify our thread
			this.socket = socket;
			available = true;
			notifyAll();

		}

没有进入while循环,置available为true后notifyAll()。这样await()方法就跳出循环并置available为false后返回一个局部变量socket(为什么要返回一个局部变量socket呢?),这样run()方法就可以开始往下走了,完成后调用recycleWorkerThread()方法进行线程回收。 

这个run()方法再次进入while循环,调用await()方法后,由于await()方法在之前跳出循环时将available设置为false,所以就进入了让锁等待,等待请求线程调用assign()方法指定任务,这样就回到了开始叙述的地方了。

为什么在await()方法中使用局部变量socket呢?

摘自深入剖析Tomcat:因为使用局部变量可以在当前Socket对象处理完之前,继续接收下一个Socket对象。 

个人认为是怕在run()方法运行的过程中其它线程调用这个Worker对象的assign()方法,毕竟这个对象的引用是可以被其它线程获取到的。为什么可以调用assign()方法重新指定呢?因为run()方法没有加synchronized关键字,所以不能与assign()方法互斥访问socket资源。还是为了安全性吧。

  

  

  

  

  

  

 

时间: 2024-10-05 19:59:09

剑指架构师系列-tomcat6通过伪异步实现connector的相关文章

剑指架构师系列-tomcat6通过IO复用实现connector

由于tomcat6的配置文件如下: <Connector port="80" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" URIEncoding="UTF-8" useBodyEncodingForURI="true" enableLookups="false" re

剑指架构师系列-tomcat6的connector

首先在StandardService中start接收请求的线程,如下: synchronized (connectors) { for (int i = 0; i < connectors.length; i++) { try { ((Lifecycle) connectors[i]).start(); } catch (Exception e) { log.error(sm.getString("standardService.connector.startFailed",co

剑指架构师系列-持续集成之Maven+Nexus+Jenkins+git+Spring boot

1.Nexus与Maven 先说一下这个Maven是什么呢?大家都知道,Java社区发展的非常强大,封装各种功能的Jar包满天飞,那么如何才能方便的引入我们项目,为我所用呢?答案就是Maven,只需要粘贴个Jar包的地址,Maven就会自动到网上查找引入到你的项目中.不过首先你的下载个Maven,然后指定一下 当下来的包包(jar)放到哪里. 我的版本是apache-maven-3.2.1,找到conf里面的配置文件 settings.xml,瞅瞅有没有 <localRepository>E:

剑指架构师系列-spring boot的logback日志记录

Spring Boot集成了Logback日志系统. Logback的核心对象主要有3个:Logger.Appender.Layout 1.Logback Logger:日志的记录器 主要用于存放日志对象,也可以定义日志类型.级别. 级别:ERROR.WARE.INFO.DEBUG和TRACE.没有FATAL,归纳到了ERROR级别里.ERROR.WARN and INFO level messages are logged by default. 在Spring Boot中,最好定义为logb

剑指架构师系列-MySQL的安装及主从同步

1.安装数据库 wget http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm rpm -ivh mysql-community-release-el7-5.noarch.rpm yum install mysql-community-server 安装时使用root用户权限.安装成功后即可进行启动: /bin/systemctl restart mysqld.service 修改MySQL数据库root用户的密码,如

剑指架构师系列-Linux下的调优

1.I/O调优 CentOS下的iostat命令输出如下: $iostat -d -k 1 2 # 查看TPS和吞吐量 参数 -d 表示,显示设备(磁盘)使用状态:-k某些使用block为单位的列强制使用Kilobytes为单位:1 10表示,数据显示每隔1秒刷新一次,共显示2次. tps:该设备每秒的传输次数,也就是一次I/O请求.多个逻辑请求可能会被合并为"一次I/O请求"."一次传输"请求的大小是未知的. kB_read/s:每秒从设备读取的数据量:kB_wr

剑指架构师系列-Redis集群部署

初步搭建Redis集群 克隆已经安装Redis的虚拟机,我们使用这两个虚拟机中的Redis来搭建集群. master:192.168.2.129 端口:7001 slave:192.168.2.132 端口:7002 sentinel:192.168.2.129 端口:26379 来说一下这个sentinel,sentinel是一个管理redis实例的工具,它可以实现对redis的监控.通知.自动故障转移.sentinel不断的检测redis实例是否可以正常工作,通过API向其他程序报告redi

剑指架构师系列-持续集成之Maven实现项目的编译、发布和部署

Maven组织项目进行编译.部署 Maven项目基本的结构说明如下: mazhi  // 控制所有荐的编译.部署.发布 mazhi-app-parent  // 项目的父项目,有一些公共的设置可以被子项目继承 mazhi-core  // 基础服务项目,例如公共类等 mazhi-xxx 其中mazhi和mazhi-app-parent是pom格式,而mazhi-core是jar格式,还可以是 war等格式. 我们以新建mazhi和mazhi-core项目为例说明一下. 新建Maven-proje

剑指架构师系列-ActiveMQ队列的使用

安装ActiveMQ只需要下载包后解压,然后就可以启动与关闭ActiveMQ了,如下: ./activemq start ./activemq stop 访问管理页面: http://10.10.20.20:8161/admin 用户名和密码默认为:admin/admin spring.activemq.broker-url -- 指定ActiveMQ broker的URL,默认自动生成. spring.activemq.in-memory -- 是否是内存模式,默认为true. spring.