如前文所述,Arbiter是gunicorn master进程的核心。Arbiter主要负责管理worker进程,包括启动、监控、杀掉Worker进程;同时,Arbiter在某些信号发生的时候还可以热更新(reload)App应用,或者在线升级gunicorn。Arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn。
Arbiter主要有以下方法:
setup:
处理配置项,最重要的是worker数量和worker工作模型
init_signal:
注册信号处理函数
handle_xxx:
各个信号具体的处理函数
kill_worker,kill_workers:
向worker进程发信号
spawn_worker, spawn_workers:
fork出新的worker进程
murder_workers:
杀掉一段时间内未响应的worker进程
manage_workers:
根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程
reexec:
接收到信号SIGUSR2调用,在线升级gunicorn
reload:
接收到信号SIGHUP调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程
sleep:
在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒
wakeup:
通过向管道写消息,唤醒进程
run:
主循环
Arbiter真正被其他代码(Application)调用的函数只有__init__和run方法,在一句代码里:
Arbiter(self).run()
上面代码中的self即为Application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码
def run() self.init_signal() self.LISTENERS = create_sockets(self.cfg, self.log) self.manage_workers() while True: if no signal in SIG_QUEUE self.sleep() else: handle_signal()
关于fork子进程
fork子进程的代码在 spawn_worker, 源码如下:
1 def spawn_worker(self): 2 self.worker_age += 1 3 worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, 4 self.app, self.timeout / 2.0, 5 self.cfg, self.log) 6 self.cfg.pre_fork(self, worker) 7 pid = os.fork() 8 if pid != 0: 9 self.WORKERS[pid] = worker 10 return pid 11 12 # Process Child 13 worker_pid = os.getpid() 14 try: 15 util._setproctitle("worker [%s]" % self.proc_name) 16 self.log.info("Booting worker with pid: %s", worker_pid) 17 self.cfg.post_fork(self, worker) 18 worker.init_process() 19 sys.exit(0) 20 except SystemExit: 21 raise 22 except AppImportError as e: 23 self.log.debug("Exception while loading the application", 24 exc_info=True) 25 print("%s" % e, file=sys.stderr) 26 sys.stderr.flush() 27 sys.exit(self.APP_LOAD_ERROR) 28 except: 29 self.log.exception("Exception in worker process"), 30 if not worker.booted: 31 sys.exit(self.WORKER_BOOT_ERROR) 32 sys.exit(-1) 33 finally: 34 self.log.info("Worker exiting (pid: %s)", worker_pid) 35 try: 36 worker.tmp.close() 37 self.cfg.worker_exit(self, worker) 38 except: 39 self.log.warning("Exception during worker exit:\n%s", 40 traceback.format_exc())
Arbiter.spawn_worker
主要流程:
(1)加载worker_class并实例化(默认为同步模型 SyncWorker)
(2)父进程(master进程)fork之后return,之后的逻辑都在子进程中运行
(3)调用worker.init_process 进入循环,worker的所有工作都在这个循环中
(4)循环结束之后,调用sys.exit(0)
(5)最后,在finally中,记录worker进程的退出
下面是我自己写的一点代码,把主要的fork流程简化了一下
1 # prefork.py 2 import sys 3 import socket 4 import select 5 import os 6 import time 7 8 def do_sub_process(): 9 pid = os.fork() 10 if pid < 0: 11 print ‘fork error‘ 12 sys.exit(-1) 13 elif pid > 0: 14 print ‘fork sub process %d‘ % pid 15 return 16 17 # must be child process 18 time.sleep(1) 19 print ‘sub process will exit‘, os.getpid(), os.getppid() 20 sys.exit(0) 21 22 def main(): 23 sub_num = 2 24 for i in range(sub_num): 25 do_sub_process() 26 time.sleep(10) 27 print ‘main process will exit‘, os.getpid() 28 29 if __name__ == ‘__main__‘: 30 main()
在测试环境下输出:
fork sub process 9601
fork sub process 9602
sub process will exit 9601 9600
sub process will exit 9602 9600
main process will exit 9600
需要注意的是第20行调用了sys.exit, 保证子进程的结束,否则会继续main函数中for循环,以及之后的逻辑。注释掉第19行重新运行,看输出就明白了。
关于kill子进程
master进程要kill worker进程就很简单了,直接发信号,源码如下:
1 def kill_worker(self, pid, sig): 2 """ 3 Kill a worker 4 5 :attr pid: int, worker pid 6 :attr sig: `signal.SIG*` value 7 """ 8 try: 9 os.kill(pid, sig) 10 except OSError as e: 11 if e.errno == errno.ESRCH: 12 try: 13 worker = self.WORKERS.pop(pid) 14 worker.tmp.close() 15 self.cfg.worker_exit(self, worker) 16 return 17 except (KeyError, OSError): 18 return 19 raise
关于sleep与wakeup
我们再来看看Arbiter的sleep和wakeup。Arbiter在没有信号需要处理的时候会"sleep",当然,不是真正调用time.sleep,否则信号来了也不能第一时间处理。这里得实现比较巧妙,利用了管道和select的timeout。看代码就知道了
def sleep(self): """ Sleep until PIPE is readable or we timeout. A readable PIPE means a signal occurred. """ ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe() if not ready[0]: return while os.read(self.PIPE[0], 1): pass
代码里面的注释写得非常清楚,要么PIPE可读立即返回,要么等待超时。管道可读是因为有信号发生。这里看看pipe函数
os.
pipe
()-
Create a pipe. Return a pair of file descriptors(r,w)
usable for reading and writing, respectively.
那我们看一下什么时候管道可读:肯定是往管道写入的东西,这就是wakeup函数的功能
def wakeup(self): """ Wake up the arbiter by writing to the PIPE """ os.write(self.PIPE[1], b‘.‘)
最后附上Arbiter的信号处理:
- QUIT, INT: Quick shutdown
- TERM: Graceful shutdown. Waits for workers to finish their current requests up to the graceful timeout.
- HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the
--preload
option), Gunicorn will also load the new version. - TTIN: Increment the number of processes by one
- TTOU: Decrement the number of processes by one
- USR1: Reopen the log files
- USR2: Upgrade the Gunicorn on the fly. A separate TERM signal should be used to kill the old process. This signal can also be used to use the new versions of pre-loaded applications.
- WINCH: Gracefully shutdown the worker processes when Gunicorn is daemonized.
reference:
http://docs.gunicorn.org/en/stable/
http://docs.gunicorn.org/en/stable/signals.html