Rocket是一个轻量级,多线程,符合WSGI规范的web框架。
Rocket使用一个线程监听连接,接收到连接之后放到Queue中,有worker线程进行处理。
Rocket含有以下属性:
- method - A string value indicating the type of Worker to use to answer the requests received by Rocket. The default is wsgi and will invoke the WSGIWorker class for handling requests. Go to theMethods section to see all available methods.
- app_info - A dictionary that holds information that the Worker class specified in method will use for configuration. See the documentation in the Methods section for the Worker class you are using for details on what to put in this dictionary.
- min_threads - An integer number of minimum Worker threads to run. This number must be greater than 0. Rocket will always have at least min_threads number of threads running at a time unless it is in the process of shutting down.
- max_threads - An integer number of maximum Worker threads. This number must be greater than min_threads or 0. A max_threads of 0 (zero) indicates there to be no maximum thread count. Rocket will continue generating threads so long as there are unanswered connections in the request queue. If the running environment is limited by how many threads a process can own, consider that in addition to max_threads there will also be a monitor thread and listening thread running.
- queue_size - An integer number of connections allowed to be queued before Rocket accepts them. This number is passed to the listen() function in the operating system’s socket library. It defaults toNone which either uses the operating system’s maximum or 5 if the OS max is not discoverable.
- timeout - An integer number of seconds to listen to a connection for a new request before closing it. Defaults to 600.
- handle_signals - A boolean indicating whether or not Rocket should respond to UNIX-style process signals (if the platform supports signals). Defaults to True.
min_threads 必须是一个大于0的数字,Rocket在同一时刻一定有大于min_threads在执行,默认这个值是10
max_threads 是指最多有多少个线程同时在执行,默认是没有限制的,对于cPython来说因为存在GIL,所以线程数量较多不存在问题;对于Jpython,由于是真正的多线程,如果web的瓶颈在于CPU,建议max_threads为CPU核数的1.5倍,避免多线程切换降低效率。
queue_size 是指默认可以接收的连接个数,通常这个值由系统决定。
Web.py中使用到了Rocket框架,初始代码如下:
class Rocket(object): """The Rocket class is responsible for handling threads and accepting and dispatching connections.""" def __init__(self, interfaces=(‘127.0.0.1‘, 8000), method=‘wsgi‘, app_info=None, min_threads=None, max_threads=None, queue_size=None, timeout=600, handle_signals=True): self.handle_signals = handle_signals self.startstop_lock = Lock() self.timeout = timeout if not isinstance(interfaces, list): self.interfaces = [interfaces] else: self.interfaces = interfaces if min_threads is None: min_threads = DEFAULTS[‘MIN_THREADS‘] if max_threads is None: max_threads = DEFAULTS[‘MAX_THREADS‘] if not queue_size: if hasattr(socket, ‘SOMAXCONN‘): queue_size = socket.SOMAXCONN else: queue_size = DEFAULTS[‘LISTEN_QUEUE_SIZE‘] if max_threads and queue_size > max_threads: queue_size = max_threads if isinstance(app_info, dict): app_info[‘server_software‘] = SERVER_SOFTWARE self.monitor_queue = Queue() self.active_queue = Queue() self._threadpool = ThreadPool(get_method(method), app_info=app_info, active_queue=self.active_queue, monitor_queue=self.monitor_queue, min_threads=min_threads, max_threads=max_threads) # Build our socket listeners self.listeners = [Listener( i, queue_size, self.active_queue) for i in self.interfaces] for ndx in range(len(self.listeners) - 1, 0, -1): if not self.listeners[ndx].ready: del self.listeners[ndx] if not self.listeners: log.critical("No interfaces to listen on...closing.") sys.exit(1) def _sigterm(self, signum, frame): log.info(‘Received SIGTERM‘) self.stop() def _sighup(self, signum, frame): log.info(‘Received SIGHUP‘) self.restart() def start(self, background=False): log.info(‘Starting %s‘ % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Set up our shutdown signals if self.handle_signals: try: import signal signal.signal(signal.SIGTERM, self._sigterm) signal.signal(signal.SIGUSR1, self._sighup) except: log.debug(‘This platform does not support signals.‘) # Start our worker threads self._threadpool.start() # Start our monitor thread self._monitor = Monitor(self.monitor_queue, self.active_queue, self.timeout, self._threadpool) self._monitor.setDaemon(True) self._monitor.start() # I know that EXPR and A or B is bad but I‘m keeping it for Py2.4 # compatibility. str_extract = lambda l: (l.addr, l.port, l.secure and ‘*‘ or ‘‘) msg = ‘Listening on sockets: ‘ msg += ‘, ‘.join( [‘%s:%i%s‘ % str_extract(l) for l in self.listeners]) log.info(msg) for l in self.listeners: l.start() finally: self.startstop_lock.release() if background: return while self._monitor.isAlive(): try: time.sleep(THREAD_STOP_CHECK_INTERVAL) except KeyboardInterrupt: # Capture a keyboard interrupt when running from a console break except: if self._monitor.isAlive(): log.error(traceback.format_exc()) continue return self.stop() def stop(self, stoplogging=False): log.info(‘Stopping %s‘ % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Stop listeners for l in self.listeners: l.ready = False # Encourage a context switch time.sleep(0.01) for l in self.listeners: if l.isAlive(): l.join() # Stop Monitor self._monitor.stop() if self._monitor.isAlive(): self._monitor.join() # Stop Worker threads self._threadpool.stop() if stoplogging: logging.shutdown() msg = "Calling logging.shutdown() is now the responsibility of the application developer. Please update your applications to no longer call rocket.stop(True)" try: import warnings raise warnings.DeprecationWarning(msg) except ImportError: raise RuntimeError(msg) finally: self.startstop_lock.release() def restart(self): self.stop() self.start()
start方法中执行下列步骤:
1、启动关闭锁加锁
2、注册终止和重启的信号
3、启动worker线程池
4、启动监控线程,添加监控线程到线程池中
5、启动所有的监听线程
6、释放启动关闭锁
7、进入while(监控线程存活)循环,每次休眠1s
stop方法:
1、启动关闭锁加锁
2、所有监听线程设置ready标记为为False
3、等待所有监听线程结束
4、等待监控线程结束
5、关闭worker线程池
6、输出关闭log
7、释放启动关闭锁
时间: 2024-10-24 23:15:01