Python中的并发编程

简介

  我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事情。

  一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen())。然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线程。因为进程之间是相互独立的,因此它们同原有的进程并发执行。这是指原进程可以在创建子进程后去执行其它工作。

  虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。

  多进程能够被用于需要同时执行多个任务的场景,由不同的进程负责任务的不同部分。然而,另一种将工作细分到任务的方法是使用线程。同进程类似,线程也有其自己的控制流以及执行栈,但线程在创建它的进程之内运行,分享其父进程的所有数据和系统资源。当应用需要完成并发任务的时候线程是很有用的,但是潜在的问题是任务间必须分享大量的系统状态。

  当使用多进程或多线程时,操作系统负责调度。这是通过给每个进程(或线程)一个很小的时间片并且在所有活动任务之间快速循环切换来实现的,这个过程将CPU时间分割为小片段分给各个任务。例如,如果你的系统中有10个活跃的进程正在执行,操作系统将会适当的将十分之一的CPU时间分配给每个进程并且循环地在十个进程之间切换。当系统不止有一个CPU核时,操作系统能够将进程调度到不同的CPU核上,保持系统负载平均以实现并行执行。

  利用并发执行机制写的程序需要考虑一些复杂的问题。复杂性的主要来源是关于同步和共享数据的问题。通常情况下,多个任务同时试图更新同一个数据结构会造成脏数据和程序状态不一致的问题(正式的说法是资源竞争的问题)。为了解决这个问题,需要使用互斥锁或是其他相似的同步原语来标识并保护程序中的关键部分。举个例子,如果多个不同的线程正在试图同时向同一个文件写入数据,那么你需要一个互斥锁使这些写操作依次执行,当一个线程在写入时,其他线程必须等待直到当前线程释放这个资源。

 Python中的并发编程

  Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。

  Python在大部分系统上同时支持消息传递和基于线程的并发编程机制。虽然大部分程序员对线程接口更为熟悉,但是Python的线程机制却有着诸多的限制。Python使用了内部全局解释器锁(GIL)来保证线程安全,GIL同时只允许一个线程执行。这使得Python程序就算在多核系统上也只能在单个处理器上运行。Python界关于GIL的争论尽管很多,但在可预见的未来却没有将其移除的可能。

  Python提供了一些很精巧的工具用于管理基于线程和进程的并发操作。即使是简单地程序也能够使用这些工具使得任务并发进行从而加快运行速度。subprocess模块为子进程的创建和通信提供了API。这特别适合运行与文本相关的程序,因为这些API支持通过新进程的标准输入输出通道传送数据。signal模块将UNIX系统的信号量机制暴露给用户,用以在进程之间传递事件信息。信号是异步处理的,通常有信号到来时会中断程序当前的工作。信号机制能够实现粗粒度的消息传递系统,但是有其他更可靠的进程内通讯技术能够传递更复杂的消息。threading模块为并发操作提供了一系列高级的,面向对象的API。Thread对象们在一个进程内并发地运行,分享内存资源。使用线程能够更好地扩展I/O密集型的任务。multiprocessing模块同threading模块类似,不过它提供了对于进程的操作。每个进程类是真实的操作系统进程,并且没有共享内存资源,但multiprocessing模块提供了进程间共享数据以及传递消息的机制。通常情况下,将基于线程的程序改为基于进程的很简单,只需要修改一些import声明即可。

 Threading模块示例

  以threading模块为例,思考这样一个简单的问题:如何使用分段并行的方式完成一个大数的累加。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import threading

class SummingThread(threading.Thread):

    def __init__(self, low, high):

        super(SummingThread, self).__init__()

        self.low = low

        self.high = high

        self.total = 0

    def run(self):

        for i in range(self.low, self.high):

            self.total += i

thread1 = SummingThread(0, 500000)

thread2 = SummingThread(500000, 1000000)

thread1.start() # This actually causes the thread to run

thread2.start()

thread1.join()  # This waits until the thread has completed

thread2.join()

# At this point, both threads have completed

result = thread1.total + thread2.total

print(result)

 自定义Threading类库

  我写了一个易于使用threads的小型Python类库,包含了一些有用的类和函数。

  关键参数:

  * do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)

  * ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列

  * start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)

  * stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

import threading

import logging

def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):

    """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).

        Parameters:

        - num_threads               Default: len(work_items)  --- Number of threads to use process items in work_items.

        - per_sync_timeout          Default: 1                --- Each synchronized operation can optionally timeout.

        - preserve_result_ordering  Default: True             --- Reorders result_item to match original work_items ordering.

        Return:

        --- list of results from applying work_func to each work_item. Order is optionally preserved.

        Example:

        def process_url(url):

            # TODO: Do some work with the url

            return url

        urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

        # process urls in parallel

        result_items = do_threaded_work(urls_to_process, process_url)

        # print(results)

        print(repr(result_items))

    """

    global wrapped_work_func

    if not num_threads:

        num_threads = len(work_items)

    work_queue = Queue.Queue()

    result_queue = Queue.Queue()

    index = 0

    for work_item in work_items:

        if preserve_result_ordering:

            work_queue.put((index, work_item))

        else:

            work_queue.put(work_item)

        index += 1

    if preserve_result_ordering:

        wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))

    start_logging_with_thread_info()

    #spawn a pool of threads, and pass them queue instance

    for _ in range(num_threads):

        if preserve_result_ordering:

            t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)

        else:

            t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)

        t.setDaemon(True)

        t.start()

    work_queue.join()

    stop_logging_with_thread_info()

    logging.info(‘work_queue joined‘)

    result_items = []

    while not result_queue.empty():

        result = result_queue.get(timeout=per_sync_timeout)

        logging.info(‘found result[:500]: ‘ + repr(result)[:500])

        if result:

            result_items.append(result)

    if preserve_result_ordering:

        result_items = [work_item for index, work_item in result_items]

    return result_items

class ThreadedWorker(threading.Thread):

    """ Generic Threaded Worker

        Input to work_func: item from work_queue

    Example usage:

    import Queue

    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

    work_queue = Queue.Queue()

    result_queue = Queue.Queue()

    def process_url(url):

        # TODO: Do some work with the url

        return url

    def main():

        # spawn a pool of threads, and pass them queue instance

        for i in range(3):

            t = ThreadedWorker(work_queue, result_queue, work_func=process_url)

            t.setDaemon(True)

            t.start()

        # populate queue with data  

        for url in urls_to_process:

            work_queue.put(url)

        # wait on the queue until everything has been processed    

        work_queue.join()

        # print results

        print repr(result_queue)

    main()

    """

    def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):

        threading.Thread.__init__(self)

        self.work_queue = work_queue

        self.result_queue = result_queue

        self.work_func = work_func

        self.stop_when_work_queue_empty = stop_when_work_queue_empty

        self.queue_timeout = queue_timeout

    def should_continue_running(self):

        if self.stop_when_work_queue_empty:

            return not self.work_queue.empty()

        else:

            return True

    def run(self):

        while self.should_continue_running():

            try:

                # grabs item from work_queue

                work_item = self.work_queue.get(timeout=self.queue_timeout)

                # works on item

                work_result = self.work_func(work_item)

                #place work_result into result_queue

                self.result_queue.put(work_result, timeout=self.queue_timeout)

            except Queue.Empty:

                logging.warning(‘ThreadedWorker Queue was empty or Queue.get() timed out‘)

            except Queue.Full:

                logging.warning(‘ThreadedWorker Queue was full or Queue.put() timed out‘)

            except:

                logging.exception(‘Error in ThreadedWorker‘)

            finally:

                #signals to work_queue that item is done

                self.work_queue.task_done()

def start_logging_with_thread_info():

    try:

        formatter = logging.Formatter(‘[thread %(thread)-3s] %(message)s‘)

        logging.getLogger().handlers[0].setFormatter(formatter)

    except:

        logging.exception(‘Failed to start logging with thread info‘)

def stop_logging_with_thread_info():

    try:

        formatter = logging.Formatter(‘%(message)s‘)

        logging.getLogger().handlers[0].setFormatter(formatter)

    except:

        logging.exception(‘Failed to stop logging with thread info‘)

 使用示例


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

from test import ThreadedWorker

from queue import Queue

urls_to_process = ["http://facebook.com", "http://pypix.com"]

work_queue = Queue()

result_queue = Queue()

def process_url(url):

    # TODO: Do some work with the url

    return url

def main():

    # spawn a pool of threads, and pass them queue instance

    for i in range(5):

        t = ThreadedWorker(work_queue, result_queue, work_func=process_url)

        t.setDaemon(True)

        t.start()

    # populate queue with data  

    for url in urls_to_process:

        work_queue.put(url)

    # wait on the queue until everything has been processed    

    work_queue.join()

    # print results

    print(repr(result_queue))

main()

  原文链接: pypix.com   翻译: 伯乐在线- 熊崽Kevin

时间: 2024-08-04 13:27:03

Python中的并发编程的相关文章

[翻译]在 .NET Core 中的并发编程

原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点.然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序的性能.在.NET Core中,任务 (tasks) 是并发编程的主要抽象表述,但还有其他支撑类可以使我们的工作更容易. 并发编程 - 异步 v

python中的函数式编程与装饰器

2.1 python中的函数式编程 函数式编码的特点 把计算视为函数而非指令 纯函数式编程,不需要变量,没有副作用,测试简单 支持高阶函数,代码简洁 python支持的函数式编程 不是纯函数式编码:允许有变量 支持高阶函数:函数也可以作为变量传入 支持闭包:有了闭包就能返回函数 有限度地支持匿名函数 2.2 python中高阶函数 函数名可以作为变量,如 高阶函数:只能接收函数作为参数的函数 变量可以是指向函数 函数的参数可以接收变量 一个函数可以接收另一个函数作为参数 能接收函数作为参数的函数

Python 中的 TK编程

可爱的 Python:Python 中的 TK编程 http://www.ibm.com/developerworks/cn/linux/sdk/python/charm-12/ python checkbox 用法详解 http://www.android100.org/html/201407/13/39698.html Tkinter GUI编程——pack http://blog.sina.com.cn/s/blog_4b5039210100epkl.html 环境安装: sudo apt

python进阶---Python中的socket编程(一)

初识socket编程 一.前言 socket基于C\S架构(客户端\服务端)的编程模型,在Python中是以socket模块存在的. Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口.在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议. 所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规

python学习_day32_并发编程之多进程

一.背景知识 顾名思义,进程即正在执行的一个过程.进程是对正在运行程序的一个抽象.进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一.操作系统的其他所有内容都是围绕进程的概念展开的. PS:即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力.将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在. #一 操作系统的作用: 1:隐藏丑陋复杂的

【python进阶】并发编程-线程与进程

并发编程-进程与线程 什么是进程(process)? 进程(process),是计算机中已运行程序的实体,是线程的容器:一个进程至少有一个线程 假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源.是不是在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让 程序B暂停,然后让程序A继续执行?当然没问题,但这里有一个关键词:切换既然是切换,那么这就涉及到了状态的保存

python语法基础-并发编程-进程-长期维护

###############    进程的启动方式1    ############## """ 并发编程: 进程 1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了, 操作系统可以管理进程,进程是操作系统基本的执行单元, 2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间, 操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因, ####################################### 进

python中的面向对象编程

在python中几乎可以完成C++里所有面向对象编程的元素. 继承:python支持多继承: class Derived(base1, base2, base3): pass 多态:python中的所有实例方法都是virtual类型的 封装: 这个比较特殊,C++编程中一直强调得比较多的是:为了隐藏实现,所有的成员变量都要是private类型的,还有那些仅与实现相关的,不作为外部接口的方法都要定义成private类型的(或者protected).但是在python里面,并不存在真正的私有类型,根

​Python中面向对象的编程

Python面向对象的编程 1概述 (1)面向对象编程 面向对象的编程是利用"类"和"对象"来创建各种模型来实现对真实世界的描述,使用面向对象编程的原因一方面是因为它可以使程序的维护和扩展变得更简单,并且可以大大提高程序开发效率,另外,基于面向对象的程序可以使它人更加容易理解你的代码逻辑,从而使团队开发变得更从容. (2)面向对象的特征 1)类(Class):一个类即是对一类拥有相同属性的对象的抽象.蓝图.原型.在类中定义了这些对象的都具备的属性(variables