python 使用multiprocessing需要注意的问题

我们在编写程序的时候经常喜欢这样写代码

import MySQLdb
import time
from multiprocessing import Process

conn = MySQLdb.connect(‘localhost‘, ‘vearne‘, ‘xx‘, ‘test‘)

def f(name):
    for i in xrange(10):
        cursor = conn.cursor()
        sql = "insert into car(name) values(%s)"
        param = [(name)]
        print param
        #time.sleep(1)
        n = cursor.execute(sql,param)
        cursor.close()
        conn.commit()

if __name__ == ‘__main__‘:
    for i in xrange(10):
        p = Process(target=f, args=(‘bob‘,))
        p.start()

上面的程序有问题吗?

以上的程序在单进程的情况下,应该是没有问题,但是在多进程的情况下,它是有错误的。

首先看看下面的源码

class Process(object):
    ‘‘‘
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    ‘‘‘
    _Popen = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        assert group is None, ‘group argument must be None for now‘
        count = _current_process._counter.next()
        self._identity = _current_process._identity + (count,)
        self._authkey = _current_process._authkey
        self._daemonic = _current_process._daemonic
        self._tempdir = _current_process._tempdir
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + ‘-‘ +                      ‘:‘.join(str(i) for i in self._identity)

    def run(self):
        ‘‘‘
        Method to be run in sub-process; can be overridden in sub-class
        ‘‘‘
        if self._target:
            self._target(*self._args, **self._kwargs)

    def start(self):
        ‘‘‘
        Start child process
        ‘‘‘
        assert self._popen is None, ‘cannot start a process twice‘
        assert self._parent_pid == os.getpid(),                ‘can only start a process object created by current process‘
        assert not _current_process._daemonic,                ‘daemonic processes are not allowed to have children‘
        _cleanup()
        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen
        self._popen = Popen(self)   # -- 创建 Popen 对象 --
        _current_process._children.add(self)
        #  省略部分代码 ... ...
    def _bootstrap(self):    # -- _bootstrap 函数 --
        from . import util
        global _current_process

        try:
            self._children = set()
            self._counter = itertools.count(1)
            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (OSError, ValueError):
                pass
            _current_process = self
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info(‘child process calling self.run()‘)
            try:
                self.run()  # -- 调用run函数 --
                exitcode = 0
            finally:
                util._exit_function()
        except SystemExit, e:
            if not e.args:
                exitcode = 1
            elif isinstance(e.args[0], int):
                exitcode = e.args[0]
            else:
                sys.stderr.write(str(e.args[0]) + ‘\n‘)
                sys.stderr.flush()
                exitcode = 0 if isinstance(e.args[0], str) else 1
        except:
            exitcode = 1
            import traceback
            sys.stderr.write(‘Process %s:\n‘ % self.name)
            sys.stderr.flush()
            traceback.print_exc()

        util.info(‘process exiting with exitcode %d‘ % exitcode)
        return exitcode

from .forking import Popen 定义

    class Popen(object):

        def __init__(self, process_obj):
            sys.stdout.flush()
            sys.stderr.flush()
            self.returncode = None

            self.pid = os.fork()     # -- fork子进程 --
            # fork 函数调用一次,会在返回两次,一次在父进程中返回,返回的pid 值大于0
            # 一次在子进程中返回,返回的pid值 等于 0
            if self.pid == 0:        # pid值 等于 0 说明 以下代码都是在子进程中执行的
                if ‘random‘ in sys.modules:
                    import random
                    random.seed()
                code = process_obj._bootstrap() # -- 调用_bootstrap函数 --
                sys.stdout.flush()
                sys.stderr.flush()
                os._exit(code)

从代码中我们可以看出,python 的multiprocessing 使用fork创建子进程,并在子进程中执行run函数

man fork 

可以得到如下信息

Fork() causes creation of a new process.  The new process (child process) is an exact copy of the calling process (parent process) except for the following:
           o   The child process has a unique process ID.
           o   The child process has a different parent process ID (i.e., the process ID of the parent process).
           o   The child process has its own copy of the parent‘s descriptors.  These descriptors reference the same underlying objects, so that, for instance, file pointers in file objects are shared between the child and the parent, so that an lseek(2) on a descriptor in the child process can affect a subsequent read or write by the parent.  This descriptor copying is also used by the shell to establish standard input and output for newly created processes as well as to set up pipes.
           o   The child processes resource utilizations are set to 0; see setrlimit(2).

fork 函数创建的子进程是父进程的完全拷贝,他们拥有相同的文件描述符。这样如果在父进程中创建了连接,就会出现父进程和多个子进程公用一个连接,会出现无法预料的错误。

(每个连接都有独立的读缓冲区和写缓冲区,多个进程的对读缓冲区和写缓冲区的操作会导致数据混乱)

所以我们应该在子进程中创建连接,这样就能够避免问题的发生。

import MySQLdb
import time
from multiprocessing import Process

class SLWorker(Process):
    def __init__(self):
        super(SLWorker, self).__init__()
        self.conn = None

    def run(self):
        # *** 注意这里 *** 连接延迟加载, 也就是说连接在子进程中被创建
        if self.conn ==  None:
            self.conn = MySQLdb.connect(‘localhost‘, ‘vearne‘, ‘xxx‘, ‘test‘)
        for i in xrange(10):
            cursor = self.conn.cursor()
            sql = "insert into car(name) values(%s)"
            name = "bob"
            param = [(name)]
            print param
            #time.sleep(30)
            n = cursor.execute(sql,param)
            cursor.close()
            self.conn.commit()
    def __del__(self):
        if self.conn != None:
            self.conn.close()

if __name__ == ‘__main__‘:
    ll = []
    for i in xrange(10):
        p = SLWorker()
        p.start()
        ll.append(p)
    for p in ll:
        p.join()

答案归结为只需要将在子进程中创建连接,或者连接延迟创建就能够解决这个问题

其实现在有很多连接池都是延迟创建连接,没有仔细看,有研究的分享下。

PS: celery 和rq 也都会有这样的问题,请大家引起足够重视

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-08 04:24:24

python 使用multiprocessing需要注意的问题的相关文章

python的multiprocessing模块进程创建、资源回收-Process,Pool

python的multiprocessing有两种创建进程的方式,每种创建方式和进程资源的回收都不太相同,下面分别针对Process,Pool及系统自带的fork三种进程分析. 1.方式一:fork() 举例: 1 import os 2 pid = os.fork() # 创建一个子进程 3 os.wait() # 等待子进程结束释放资源 4 pid为0的代表子进程. 缺点:1.兼容性差,只能在类linux系统下使用,windows系统不可使用:2.扩展性差,当需要多条进程的时候,进程管理变得

Python使用multiprocessing实现一个最简单的分布式作业调度系统

Python使用multiprocessing实现一个最简单的分布式作业调度系统 介绍 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信. 想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统. 实现 Job 首先创建一个Job类,为了测试简单,只包含一个job id属性 job.py #!/usr/bin/env python # -

python多进程-----multiprocessing包

multiprocessing并非是python的一个模块,而是python中多进程管理的一个包,在学习的时候可以与threading这个模块作类比,正如我们在上一篇转载的文章中所提,python的多线程并不能做到真正的并行处理,只能完成相对的并发处理,那么我们需要的就是python的多进程来完成并行处理,把所有的cpu资源都利用起来.multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的环境.这里面要注意,对于多进程来说,win32平台和unix平

python使用multiprocessing进行多进程编程(1)

multiprocessing模块实现了对多进程编程的封装,让我们可以非常方便的使用多进程进行编程.它的使用方法非常类似threading模块. 1.创建一个进程 import multiprocessing def worker(): """worker function""" print 'Worker' return if __name__ == '__main__': jobs = [] for i in range(5): p = mu

python通过multiprocessing 实现带回调函数的异步调用的代码

下边代码段是关于python通过multiprocessing 实现带回调函数的异步调用的代码. from multiprocessing import Pool def f(x): if __name__ == '__main__': pool = Pool(processes=1) # Start a worker processes. result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchron

Shared variable in python's multiprocessing

Shared variable in python's multiprocessing https://www.programcreek.com/python/example/58176/multiprocessing.Value https://docs.python.org/zh-cn/3.7/library/multiprocessing.html#multiprocessing-programming 在 Unix 上,如果一个进程执行完成但是没有被 join,就会变成僵尸进程. htt

Python多进程multiprocessing(二)

紧接上文 在上文Python多进程multiprocessing(一)中我们介绍了多进程multiprocessing的部分基础操作,在本文中,我们将继续介绍关于多进程的一些知识,比如进程池Pool这个有用的东东.马上开始吧! 使用实例 实例1 import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool(processes=2) res = pool.map(job,range(10))

python的multiprocessing到底怎么用的问题

众所周知,由于python(Cpython)的全局锁(GIL)问题存在,导致Thread也就是线程的并行并不可实现. multiprocessing 模块采用多进程而不是多线程的方式实现并行,解决了GIL的问题,一定程度上使状况得到了缓解. 然而,Multiprocess本身依然有一些功能上的瓶颈.其中一个重要的是:进程之间不能共享内存(线程间则可以共享内存).这意味着在进程间交换数据的时候,需要把数据打包.传递,解包.在python的语境下就是: "pickle from main proce

Python多进程multiprocessing使用示例

mutilprocess简介 像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. import multiprocessing def worker(num): """thread worker function""" print 'Worker:', num return if __name__ == '__main__': jobs = [] for i