gunicorn Arbiter 源码解析

前文所述,Arbiter是gunicorn master进程的核心。Arbiter主要负责管理worker进程,包括启动、监控、杀掉Worker进程;同时,Arbiter在某些信号发生的时候还可以热更新(reload)App应用,或者在线升级gunicorn。Arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn

  

Arbiter主要有以下方法:

setup:

处理配置项,最重要的是worker数量和worker工作模型

init_signal

注册信号处理函数

handle_xxx:

各个信号具体的处理函数

kill_worker,kill_workers:

向worker进程发信号

spawn_worker, spawn_workers:

fork出新的worker进程

murder_workers:

杀掉一段时间内未响应的worker进程

manage_workers:

根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程

reexec

接收到信号SIGUSR2调用,在线升级gunicorn

reload:

接收到信号SIGHUP调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程

sleep

在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒

wakeup

通过向管道写消息,唤醒进程

run

主循环

  Arbiter真正被其他代码(Application)调用的函数只有__init__和run方法,在一句代码里:

Arbiter(self).run()

  上面代码中的self即为Application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码

def run()
    self.init_signal()
    self.LISTENERS = create_sockets(self.cfg, self.log)
    self.manage_workers()    while True:        if no signal in SIG_QUEUE
            self.sleep()        else:
            handle_signal()

 关于fork子进程

  fork子进程的代码在 spawn_worker, 源码如下:

  

 Arbiter.spawn_worker

  主要流程:

(1)加载worker_class并实例化(默认为同步模型 SyncWorker)

(2)父进程(master进程)fork之后return,之后的逻辑都在子进程中运行

(3)调用worker.init_process 进入循环,worker的所有工作都在这个循环中

(4)循环结束之后,调用sys.exit(0)

(5)最后,在finally中,记录worker进程的退出

下面是我自己写的一点代码,把主要的fork流程简化了一下

 1 # prefork.py 2 import sys 3 import socket 4 import select 5 import os 6 import time 7   8 def do_sub_process(): 9     pid = os.fork()10     if pid < 0:11         print ‘fork error‘12         sys.exit(-1)13     elif pid > 0:14         print ‘fork sub process %d‘  % pid15         return16  17     # must be child process18     time.sleep(1)19     print ‘sub process will exit‘, os.getpid(), os.getppid()20     sys.exit(0)21  22 def main():23     sub_num = 224     for i in range(sub_num):25         do_sub_process()26     time.sleep(10)27     print ‘main process will exit‘, os.getpid()28  29 if __name__ == ‘__main__‘:30     main()

在测试环境下输出:

  fork sub process 9601

  fork sub process 9602

  sub process will exit 9601 9600

  sub process will exit 9602 9600

  main process will exit 9600

  需要注意的是第20行调用了sys.exit, 保证子进程的结束,否则会继续main函数中for循环,以及之后的逻辑。注释掉第19行重新运行,看输出就明白了。

关于kill子进程

  master进程要kill worker进程就很简单了,直接发信号,源码如下:

  

 1     def kill_worker(self, pid, sig): 2         """\ 3         Kill a worker 4  5         :attr pid: int, worker pid 6         :attr sig: `signal.SIG*` value 7          """ 8         try: 9             os.kill(pid, sig)10         except OSError as e:11             if e.errno == errno.ESRCH:12                 try:13                     worker = self.WORKERS.pop(pid)14                     worker.tmp.close()15                     self.cfg.worker_exit(self, worker)16                     return17                 except (KeyError, OSError):18                     return19             raise

关于sleep与wakeup

  我们再来看看Arbiter的sleep和wakeup。Arbiter在没有信号需要处理的时候会"sleep",当然,不是真正调用time.sleep,否则信号来了也不能第一时间处理。这里得实现比较巧妙,利用了管道和select的timeout。看代码就知道了

        def sleep(self):        """        Sleep until PIPE is readable or we timeout.
        A readable PIPE means a signal occurred.        """
            ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe()
            if not ready[0]: 
                return
            while os.read(self.PIPE[0], 1):                pass

  代码里面的注释写得非常清楚,要么PIPE可读立即返回,要么等待超时。管道可读是因为有信号发生。这里看看pipe函数

  •   os.pipe()
  • Create a pipe. Return a pair of file descriptors (r,w) usable for reading and writing, respectively.

  那我们看一下什么时候管道可读:肯定是往管道写入的东西,这就是wakeup函数的功能

        def wakeup(self):            """
            Wake up the arbiter by writing to the PIPE            """
            os.write(self.PIPE[1], b‘.‘)

最后附上Arbiter的信号处理

  • QUITINT: Quick shutdown
  • TERM: Graceful shutdown. Waits for workers to finish their current requests up to the graceful timeout.
  • HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the --preloadoption), Gunicorn will also load the new version.
  • TTIN: Increment the number of processes by one
  • TTOU: Decrement the number of processes by one
  • USR1: Reopen the log files
  • USR2: Upgrade the Gunicorn on the fly. A separate TERM signal should be used to kill the old process. This signal can also be used to use the new versions of pre-loaded applications.
  • WINCH: Gracefully shutdown the worker processes when Gunicorn is daemonized.
时间: 2024-11-10 02:46:56

gunicorn Arbiter 源码解析的相关文章

gunicorn syncworker 源码解析

gunicorn支持不同的worker类型,同步或者异步,异步的话包括基于gevent.基于eventlet.基于Aiohttp(python版本需要大于3.3),也有多线程的版本.下面是gunicorn当前版本(19.6.0)支持的Worker类型: sync eventlet - Requires eventlet >= 0.9.7 gevent - Requires gevent >= 0.13 tornado - Requires tornado >= 0.2 gthread -

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

eclipse中导入jdk源码、SpringMVC注解@RequestParam、SpringMVC文件上传源码解析、ajax上传excel文件

eclipse中导入jdk源码:http://blog.csdn.net/evolly/article/details/18403321, http://www.codingwhy.com/view/799.html. ------------------------------- SpringMVC注解@RequestParam:http://825635381.iteye.com/blog/2196911. --------------------------- SpringMVC文件上传源

String源码解析(一)

本篇文章内的方法介绍,在方法的上面的注释讲解的很清楚,这里只阐述一些要点. Java中的String类的定义如下: 1 public final class String 2 implements java.io.Serializable, Comparable<String>, CharSequence { ...} 可以看到,String是final的,而且继承了Serializable.Comparable和CharSequence接口. 正是因为这个特性,字符串对象可以被共享,例如下面

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

Spring源码解析-applicationContext

Demo uml类图 ApplicationContext ApplicationListener 源码解析 主流程 obtainFreshBeanFactory prepareBeanFactory invokeBeanFactoryPostProcessors registerBeanPostProcessors registerListeners finishRefresh 总结 在已经有BeanFactory可以完成Ioc功能情况下,spring又提供了ApplicationContex

socketserver源码解析和协程版socketserver

来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力 client import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print('receive:',data.decode()) inp = input('please input:') sk

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p