通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait

http://blog.chinaunix.net/uid-23504396-id-4661783.html

执行subprocess的时候,执行不是问题
最麻烦的是获取进程执行后的回显来确认是否正确执行,还不能阻塞
还要获取进程执行后的返回状态确认进程是否正确结束,也不能阻塞

分开解决这个问题
我们先解决第一个问题,获取回显

一般获取回显,代码都是如下写法

点击(此处)折叠或打开

  1. sub_process = subprocess.Popen(command, stdin = subprocess.PIPE,stdout = subprocess.PIPE,stderr = subprocess.PIPE, shell = True)

为了搞清楚subprocess是怎么获取子进程stdout的,我们首先看看 subprocess.PIPE是什么
进入代码里可以看见subprocess.PIPE 直接是个int -1
再看看网上一般获取subprocess回显的代码

点击(此处)折叠或打开

  1. lines = sub_process.stdout.readline()

subprocess.PIPE是-1,为什么Popen这个类的stdout变成了什么对象,可以用readline方法呢
打印type可以知道Popen对象的stdout的类型是file,我们看看subprocess里做了什么操作。
我们看看Popen的init方法(python 2.7.8)

stdout传入_get_handles函数准换出(p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite)

点击(此处)折叠或打开

  1. (p2cread, p2cwrite,
  2. c2pread, c2pwrite,
  3. errread, errwrite) = self._get_handles(stdin, stdout, stderr)

p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite  传入_execute_child中,这个函数看名字就知道是真正的执行函数

点击(此处)折叠或打开

  1. self._execute_child(args, executable, preexec_fn, close_fds,
  2. cwd, env, universal_newlines,
  3. startupinfo, creationflags, shell,
  4. p2cread, p2cwrite,
  5. c2pread, c2pwrite,
  6. errread, errwrite)

p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite传入执行函数后,stdout等通过fdopen函数转换问file对象

点击(此处)折叠或打开

  1. if p2cwrite is not None:
  2. self.stdin = os.fdopen(p2cwrite, ‘wb‘, bufsize)
  3. if c2pread is not None:
  4. if universal_newlines:
  5. self.stdout = os.fdopen(c2pread, ‘rU‘, bufsize)
  6. else:
  7. self.stdout = os.fdopen(c2pread, ‘rb‘, bufsize)
  8. if errread is not None:
  9. if universal_newlines:
  10. self.stderr = os.fdopen(errread, ‘rU‘, bufsize)
  11. else:
  12. self.stderr = os.fdopen(errread, ‘rb‘, bufsize)

我们先看看_get_handles方法,部分代码如下

点击(此处)折叠或打开

  1. def _get_handles(self, stdin, stdout, stderr):
  2. """Construct and return tuple with IO objects:
  3. p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
  4. """
  5. p2cread, p2cwrite = None, None
  6. c2pread, c2pwrite = None, None
  7. errread, errwrite = None, None
  8. if stdin is None:
  9. pass
  10. elif stdin == PIPE:
  11. p2cread, p2cwrite = self.pipe_cloexec()
  12. elif isinstance(stdin, int):
  13. p2cread = stdin
  14. else:
  15. # Assuming file-like object
  16. p2cread = stdin.fileno()

再跟踪进去看pipe_cloexec

点击(此处)折叠或打开

  1. def pipe_cloexec(self):
  2. """Create a pipe with FDs set CLOEXEC."""
  3. # Pipes‘ FDs are set CLOEXEC by default because we don‘t want them
  4. # to be inherited by other subprocesses: the CLOEXEC flag is removed
  5. # from the child is FDs by _dup2(), between fork() and exec().
  6. # This is not atomic: we would need the pipe2() syscall for that.
  7. r, w = os.pipe()
  8. self._set_cloexec_flag(r)
  9. self._set_cloexec_flag(w)
  10. return r, w

可以知道,当stdout赋值为subprocess.PIPE(即-1)时,subprocess内部通过os.pipe()创建一个管道,并返回管道的读,写文件描述符

点击(此处)折叠或打开

  1. os.pipe()
  2. Create a pipe. Return a pair of file descriptors (r, w) usable for reading and writing, respectively.

_set_cloexec_flag函数暂时不用详细看了,只是通过fcntl设置下文件做控制。

所以从这里我可以看出stdout等传入subprocess.PIPE后,这个值只是作为一个判断值,判断为此值以后,内部通过os.piep()用作输入输出传送。
由于subprocess内部创建的pipe()大小不可控,所以推举做法是使用StringIO创建一个内存文件对象,并传入这个对象的fileno,参考文章
http://backend.blog.163.com/blog/static/2022941262014016710912/

现在就剩下单问题就是,这个管道如何获得子进程的输入输出的呢,这就要看_execute_child里是怎么做的了
具体说明我直接在下面源代码里注释说明,最后再做总结

点击(此处)折叠或打开

  1. def _execute_child(self, args, executable, preexec_fn, close_fds,
  2. cwd, env, universal_newlines,
  3. startupinfo, creationflags, shell,
  4. p2cread, p2cwrite,
  5. c2pread, c2pwrite,
  6. errread, errwrite):
  7. """Execute program (POSIX version)"""
  8. if isinstance(args, types.StringTypes):
  9. args = [args]
  10. else:
  11. args = list(args)
  12. if shell:
  13. args = ["/bin/sh", "-c"] + args
  14. if executable:
  15. args[0] = executable
  16. if executable is None:
  17. executable = args[0]
  18. #这里又创建了一个管道,这个管道只用来获取自进程try后except出来的内容,不是获取stderr
  19. errpipe_read, errpipe_write = self.pipe_cloexec()
  20. try:
  21. try:
  22. gc_was_enabled = gc.isenabled()
  23. #这里关闭了gc回收,防止对象被回收,这里值得学习。
  24. gc.disable()
  25. try:
  26. self.pid = os.fork()
  27. except:
  28. if gc_was_enabled:
  29. gc.enable()
  30. raise
  31. self._child_created = True
  32. if self.pid == 0:
  33. #如果pid为0,表示自己是子进程,执行下面代码(父进程获取到的是子进程的PID,不执行此代码)
  34. #父子进程pipe()通信原理——利用pipe()建立起来的无名文件(无路径名)。只用该系统调用所返回的文件描述符来标识该文件.
  35. #只有调用pipe()的进程及其子孙进程才能识别此文件描述符,才能利用该文件(管道)进行通信。当这些进程不再使用此管道时,核心收回其索引结点。
  36. #如果Pope对象初始化的时候,stdin stdout stderr都用subprocess.PIPE的话,那么fork前会创建3个管道,并传入对应的文件描述符进来
  37. try:
  38. #关闭从父进程复制过来的的不需要的管道的一端
  39. if p2cwrite is not None:
  40. os.close(p2cwrite)
  41. if c2pread is not None:
  42. os.close(c2pread)
  43. if errread is not None:
  44. os.close(errread)
  45. os.close(errpipe_read)
  46. #下面都是做了一些文件描述符复制操作,反正通过下面的代码将子进程的输出传到父进程
  47. #那些描述符复制操作基本就相当于把子进程的stdout、stdin、stderr的fd绑定的父进程传过来的文件描述符上
  48. # When duping fds, if there arises a situation
  49. # where one of the fds is either 0, 1 or 2, it
  50. # is possible that it is overwritten (#12607).
  51. if c2pwrite == 0:
  52. c2pwrite = os.dup(c2pwrite)
  53. if errwrite == 0 or errwrite == 1:
  54. errwrite = os.dup(errwrite)
  55. # Dup fds for child
  56. def _dup2(a, b):
  57. # dup2() removes the CLOEXEC flag but
  58. # we must do it ourselves if dup2()
  59. # would be a no-op (issue #10806).
  60. if a == b:
  61. self._set_cloexec_flag(a, False)
  62. elif a is not None:
  63. os.dup2(a, b)
  64. _dup2(p2cread, 0)
  65. _dup2(c2pwrite, 1)
  66. _dup2(errwrite, 2)
  67. #2.7才有的写法,2.6这样写报错,2.7大概这样写比list里找快一点,所以用了dict
  68. #如果管道文件描述符大于2的话,关闭从主进程赋值过来的管道的一端,
  69. closed = { None }
  70. for fd in [p2cread, c2pwrite, errwrite]:
  71. if fd not in closed and fd > 2:
  72. os.close(fd)
  73. closed.add(fd)
  74. #这里控制关闭前面用来保存except输出的管道
  75. if close_fds:
  76. self._close_fds(but=errpipe_write)
  77. #切换下执行目录防止运行出错,这里也值得学习!
  78. if cwd is not None:
  79. os.chdir(cwd)
  80. if preexec_fn:
  81. preexec_fn()
  82. #可以看到,最终是通过execvp/execvpe来执行系统命令的
  83. if env is None:
  84. os.execvp(executable, args)
  85. else:
  86. os.execvpe(executable, args, env)
  87. except:
  88. exc_type, exc_value, tb = sys.exc_info()
  89. # Save the traceback and attach it to the exception object
  90. exc_lines = traceback.format_exception(exc_type,
  91. exc_value,
  92. tb)
  93. exc_value.child_traceback = ‘‘.join(exc_lines)
  94. #子进程将错误信息写入接受except的管道的写端
  95. os.write(errpipe_write, pickle.dumps(exc_value))
  96. #这里退出子进程
  97. os._exit(255)
  98. #父进程启动自进程后,重新打开gc回收
  99. if gc_was_enabled:
  100. gc.enable()
  101. finally:
  102. #父关闭保存子进程except输出的管道的写端
  103. os.close(errpipe_write)
  104. #父进程也关闭不需要使用的管道的一端
  105. if p2cread is not None and p2cwrite is not None:
  106. os.close(p2cread)
  107. if c2pwrite is not None and c2pread is not None:
  108. os.close(c2pwrite)
  109. if errwrite is not None and errread is not None:
  110. os.close(errwrite)
  111. #通过获取except输出的管道的读端获取最大1M的数据
  112. data = _eintr_retry_call(os.read, errpipe_read, 1048576)
  113. finally:
  114. #父关闭保存子进程except输出的管道的读端
  115. os.close(errpipe_read)
  116. #如果有子进程except输出,抛出自定义错误,init函数那边会try到并做相应处理
  117. if data != "":
  118. try:
  119. _eintr_retry_call(os.waitpid, self.pid, 0)
  120. except OSError as e:
  121. if e.errno != errno.ECHILD:
  122. raise
  123. child_exception = pickle.loads(data)
  124. raise child_exception

下面我们总结下,创建Popen对象时,我们传入subprocess.PIPE。
内部通过os.pipe()创建1-3个管道
生成的子进程复制了这些管道的文件描述符,子进程内部将自己的输出绑定到这写管道上
父进程通过os.fdopen将管道的文件描述符打开为file对象
并赋值给self.stdin  self.stdout stderr

因为是file对象,我们就可以直接通过read、readline、readlines等方法获取回显的字符串了
但是由于file对象的read、readline、readlines方法都是阻塞的,那么我们可以这样。
新建立一个线程去读取,并把读出来的内容塞入一个列表,每次我们主进程都去读取这个列表的最后一列
线程中读取后写入列表的延迟 需要大于主进程读取列表最后一列的延迟,以免判断内容还没被主进程读取已经进入下一列

读取子进程回显函数

点击(此处)折叠或打开

  1. def stdout_theard(end_mark,cur_stdout,stdout_lock,string_list):
  2. #用户获取subprocess的stdout输出的线程,防止阻塞
  3. #cur_stdout是一个file对象,end_mark是个随机字符串,获取到这个字符串表明结束
  4. #先暂停0.01秒
  5. time.sleep(0.01)
  6. for i in range(3000):
  7. try:
  8. out_put = cur_stdout.readline()
  9. if not out_put:
  10. #添加结束标记
  11. stdout_lock.acquire()
  12. string_list.append(end_mark)
  13. stdout_lock.release()
  14. break
  15. if out_put == end_mark:
  16. #out put正好和end_mark相等的特殊情况
  17. continue
  18. #外部获取到指定内容会清理string_list列表,所以要加锁
  19. stdout_lock.acquire()
  20. string_list.append(out_put.rstrip().lstrip())
  21. stdout_lock.release()
  22. time.sleep(0.03)
  23. except:
  24. print ‘wtffff!!!!!!tuichule !!‘
  25. break

主进程中启动线程

点击(此处)折叠或打开

  1. stdout_list = []
  2. stdout_lock = threading.Lock()
  3. end_mark = ‘end9c2nfxz‘
  4. cur_stdout_thread = threading.Thread(target=stdout_theard, args=(end_mark,sub_process.stdout,stdout_lock,stdout_list))
  5. cur_stdout_thread.setDaemon(‘True‘)
  6. cur_stdout_thread.start()

主进程中判断子进程回显内容是否正确
我的例子是的作用是  erl进程里输入command_reload_list里的所有命令,并判断并记录每个命令执行后是否有ok_str返回

点击(此处)折叠或打开

  1. for command_reload_dict in command_reload_list:
  2. sub_process.stdin.write(command_reload_dict[‘com‘] + ‘\r\n‘)
  3. #每个命令执行后通过线程修改的str list的最后一个元素来获取取回显的最后一行
  4. #得到返回值等于ok_str的为正确,延迟0.2后退出并清理回显,否则总共等待300*0.01秒
  5. ok_str = ‘load module %s true‘ % command_reload_dict[‘mod‘]
  6. for i in xrange(300):
  7. if len(stdout_list)>0:
  8. #获得正确的返回,退出
  9. if stdout_list[-1] == ok_str:
  10. #记录当前模块热更成功
  11. command_reload_dict[‘res‘] = ‘ok‘
  12. break
  13. if stdout_list[-1] == end_mark:
  14. #遇到end_mark 说明读线程已经结束,说明有错,直接退出
  15. return_value[‘msg‘] += ‘reload mod process has been exit in [%s]‘ % command_reload_dict[‘mod‘]
  16. return return_value
  17. break
  18. time.sleep(0.01)
  19. #清除上个reload命令产生的回显
  20. stdout_lock.acquire()
  21. del stdout_list[:]
  22. stdout_lock.release()
  23. #子进程输入退出命令
  24. sub_process.stdin.write(‘q().\r\n‘)
  25. #等待tmp erl 进程退出
  26. for i in xrange(300):
  27. if len(stdout_list)>0:
  28. if stdout_list[-1] == end_mark:
  29. break
  30. time.sleep(0.01)

=======================================第二个问题的分割线=========================================
进程执行后的返回状态确认进程是否正确结束,不能阻塞
之前我有接触过这个问题的,当时还没细看subprocess源码
http://blog.chinaunix.net/uid-23504396-id-4471612.html

我现在的写法

点击(此处)折叠或打开

  1. if stop_process.poll() is None:
  2. try:
  3. if stop_process.stdout:
  4. stop_process.stdout.close()
  5. if stop_process.stderr:
  6. stop_process.stderr.close()
  7. stop_process.terminate()
  8. time.sleep(0.5)
  9. if stop_process.poll() is None:
  10. stop_process.kill()
  11. time.sleep(0.2)
  12. if stop_process.poll() is None:
  13. print ‘wtf!!!!‘
  14. else:
  15. stop_process.wait()
  16. else:
  17. stop_process.wait()
  18. except:
  19. print ‘wtf?‘

上面代码我一直有个疑问,poll()之后如果有问题进程还没结束怎么办?
因为sub_process.wait()是阻塞的,所以我在poll以后直接sub_process.wait()是不是也会被卡住?
subprocess的wati到底调用了什么?

当然我也可以像获取回显那样,启一个线程,主进程通过一个可以指定次数的循环来获取wait返回。
不过这样做太绕了,所以我们直接进代码看,把wait彻底搞明白

点击(此处)折叠或打开

  1. def poll(self):
  2. return self._internal_poll()

点击(此处)折叠或打开

  1. def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
  2. _WNOHANG=os.WNOHANG, _os_error=os.error, _ECHILD=errno.ECHILD):
  3. """Check if child process has terminated. Returns returncode
  4. attribute.
  5. This method is called by __del__, so it cannot reference anything
  6. outside of the local scope (nor can any methods it calls).
  7. """
  8. if self.returncode is None:
  9. try:
  10. pid, sts = _waitpid(self.pid, _WNOHANG)
  11. if pid == self.pid:
  12. self._handle_exitstatus(sts)
  13. except _os_error as e:
  14. if _deadstate is not None:
  15. self.returncode = _deadstate
  16. if e.errno == _ECHILD:
  17. # This happens if SIGCLD is set to be ignored or
  18. # waiting for child processes has otherwise been
  19. # disabled for our process. This child is dead, we
  20. # can not get the status.
  21. # http://bugs.python.org/issue15756
  22. self.returncode = 0
  23. return self.returncode

再看看wait的代码

点击(此处)折叠或打开

  1. def wait(self):
  2. """Wait for child process to terminate. Returns returncode
  3. attribute."""
  4. while self.returncode is None:
  5. try:
  6. pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0)
  7. except OSError as e:
  8. if e.errno != errno.ECHILD:
  9. raise
  10. # This happens if SIGCLD is set to be ignored or waiting
  11. # for child processes has otherwise been disabled for our
  12. # process. This child is dead, we can not get the status.
  13. pid = self.pid
  14. sts = 0
  15. # Check the pid and loop as waitpid has been known to return
  16. # 0 even without WNOHANG in odd situations. issue14396.
  17. if pid == self.pid:
  18. self._handle_exitstatus(sts)
  19. return self.returncode

看到这里就明白了,poll和wait最终调用的是os.waitpid,但是poll是非阻塞的wait是阻塞的.....
我们看看python的文档

点击(此处)折叠或打开

  1. os.waitpid(pid, options)
  2. The details of this function differ on Unix and Windows.
  3. On Unix: Wait for completion of a child process given by process id pid, and return a tuple containing its process id and exit status indication (encoded as for wait()). The semantics of the call are affected by the value of the integer options, which should be 0 for normal operation.
  4. os.WNOHANG
  5. The option for waitpid() to return immediately if no child process status is available immediately. The function returns (0, 0) in this case.

所以,发送kill信号后,pool()后就不需要wait了

原文地址:https://www.cnblogs.com/linkenpark/p/8979161.html

时间: 2024-10-11 20:22:47

通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait的相关文章

《python解释器源码剖析》第17章--python的内存管理与垃圾回收

17.0 序 内存管理,对于python这样的动态语言是至关重要的一部分,它在很大程度上决定了python的执行效率,因为在python的运行中会创建和销毁大量的对象,这些都设计内存的管理.同理python还提供了了内存的垃圾回收(GC,garbage collection),将开发者从繁琐的手动维护内存的工作中解放出来.这一章我们就来分析python的GC是如何实现的. 17.1 内存管理架构 在python中内存管理机制是分层次的,我们可以看成有四层,0 1 2 3.在最底层,也就是第0层是

[Android阅读代码]android-async-http源码学习一

android-async-http 下载地址 一个比较常用的Http请求库,基于org.apache.http对http操作进行封装. 特点: 1.每一个HTTP请求发生在UI线程之外,Client通过回调处理HTTP请求的结果,使得Client代码逻辑清晰 2.每一个请求使用线程池管理执行 3.支持gzip , cookie等功能 4.支持自动重试连接功能 [Android阅读代码]android-async-http源码学习一,布布扣,bubuko.com

一个python游戏源码

#finalPyPong.py import pygame,sys class MyBallClass(pygame.sprite.Sprite): def __init__(self,image_file,speed,location=[0,0]): pygame.sprite.Sprite.__init__(self) self.image = pygame.image.load(image_file) self.rect = self.image.get_rect() self.rect.

【转】linux环境下python的源码安装

[转载: http://www.cnblogs.com/yuechaotian/archive/2013/06/03/3115482.html] [问题] 在编译Sequoiadb的Python驱动源码的过程中,出现:Python.h不存在的问题.经求证,Python.h文件只在python的源码安装中存在,而在一般的二进制安装中则不存在. 1. 下载python2.7.5,保存到 /data/qtongmon/software wget https://www.python.org/ftp/p

daily news新闻阅读客户端应用源码(兼容iPhone和iPad)

daily news新闻阅读客户端应用源码(兼容iPhone和iPad),也是一款兼容性较好的应用,可以支iphone和ipad的阅读阅读器源码,设计风格和排列效果很不错,现在做新闻资讯客户端的朋友可以参考一下吧. 源码下载: http://code.662p.com/view/6384.html 详细说明:http://ios.662p.com/thread-1526-1-1.html

读取本地HTML的小说阅读器应用源码项目

该源码是一个不错的读取本地HTML的小说阅读器,读取本地HTML的小说阅读器,并且源码也比较简单的,非常适合我们的新手朋友拿来学习,有兴趣的朋友研究下. 源码下载: http://code.662p.com/view/10134.html 详细说明:http://android.662p.com/thread-6191-1-1.html

如何阅读Android系统源码-收藏必备

对于任何一个对Android开发感兴趣的人而言,对于android系统的学习必不可少.而学习系统最佳的方法就如linus所言:"RTFSC"(Read The Fucking Source Code).下面从知乎整理了一些优质回答,以飨读者. 巨人的肩膀 AOSP项目官方: https://source.android.com/source/index.html这个一定要先读. 项目介绍, 代码下载, 环境搭建, 刷机方法, Eclipse配置都在这里. 这是一切的基础. Androi

android五子棋游戏、资讯阅读、大学课程表、地图拖拽检测、小说搜索阅读app等源码

Android精选源码 Android 自动生成添加控件 android旋转动画.圆形进度条组合效果源码 一款很强的手机五子棋app源码 android地图拖拽区域检测效果源码 实现Android大学课表效果APP源码 android完全免费的小说搜索阅读app 一个互联网资讯阅读平台和良好的阅读体验的App Android优质博客 Android中高效的显示图片Bitmap的内存模型 相对于文字来说,图片的表达更直接.更有冲击力.更容易吸引用户的眼球.设计师们也理所当然的喜欢用图片来传达信息.

python slots源码分析

上次总结Python3的字典实现后的某一天,突然开窍Python的__slots__的实现应该也是类似,于是翻了翻CPython的源码,果然如此! 关于在自定义类里面添加__slots__的效果,网上已经有很多资料了,其中优点大致有: (1)更省内存. (2)访问属性更高效. 而本文讲的是,为什么更省内存?为什么更高效?当然为了弄明白这些,深入到CPython的源码是必不可少的.不过,心里有个猜想之后再去看源码效果或许更好,这样目的性更强,清楚自己需要关注的是什么以免在其中迷失! 我先稍微解释一