Python 多线程教程:并发与并行

Python 多线程教程:并发与并行

在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行。必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情。如果你还没看过的话,我建议你看看Eqbal Quran的文章《Ruby中的并发和并行》。

在本文中,我们将会写一个小的Python脚本,用于下载Imgur上最热门的图片。我们将会从一个按顺序下载图片的版本开始做起,即一个一个地下载。在那之前,你得注册一个Imgur上的应用。如果你还没有Imgur账户,请先注册一个。

本文中的脚本在Python3.4.2中测试通过。稍微改一下,应该也能在Python2中运行——urllib是两个版本中区别最大的部分。

快速使用Romanysoft LAB的技术实现 HTML 开发Mac OS App,并销售到苹果应用商店中。

《HTML开发Mac OS App 视频教程》

官方QQ群:(申请加入,说是我推荐的

  • App实践出真知 434558944       
  • App学习交流 452180823          

1、开始动手

让我们从创建一个叫“download.py”的Python模块开始。这个文件包含了获取图片列表以及下载这些图片所需的所有函数。我们将这些功能分成三个单独的函数:

  • get_links
  • download_link
  • setup_download_dir

第三个函数,“setup_download_dir”,用于创建下载的目标目录(如果不存在的话)。

Imgur的API要求HTTP请求能支持带有client ID的“Authorization”头部。你可以从你注册的Imgur应用的面板上找到这个client ID,而响应会以JSON进行编码。我们可以使用Python的标准JSON库去解码。下载图片更简单,你只需要根据它们的URL获取图片,然后写入到一个文件即可。

代码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

import json

import logging

import os

from pathlib import Path

from urllib.request import urlopen, Request

 

logger = logging.getLogger(__name__)

 

def get_links(client_id):

   headers = {‘Authorization‘‘Client-ID {}‘.format(client_id)}

   req = Request(‘https://api.imgur.com/3/gallery/‘, headers=headers, method=‘GET‘)

   with urlopen(req) as resp:

       data = json.loads(resp.readall().decode(‘utf-8‘))

   return map(lambda item: item[‘link‘], data[‘data‘])

 

def download_link(directory, link):

   logger.info(‘Downloading %s‘, link)

   download_path = directory / os.path.basename(link)

   with urlopen(link) as image, download_path.open(‘wb‘) as f:

       f.write(image.readall())

 

def setup_download_dir():

   download_dir = Path(‘images‘)

   if not download_dir.exists():

       download_dir.mkdir()

   return download_dir

接下来,你需要写一个模块,利用这些函数去逐个下载图片。我们给它命名为“single.py”。它包含了我们最原始版本的Imgur图片下载器的主要函数。这个模块将会通过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调用“setup_download_dir”去创建下载目录。最后,使用get_links函数去获取图片的列表,过滤掉所有的GIF和专辑URL,然后用“download_link”去将图片下载并保存在磁盘中。下面是“single.py”的代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

import logging

import os

from time import time

 

from download import setup_download_dir, get_links, download_link

 

logging.basicConfig(level=logging.DEBUG, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s‘)

logging.getLogger(‘requests‘).setLevel(logging.CRITICAL)

logger = logging.getLogger(__name__)

 

def main():

   ts = time()

   client_id = os.getenv(‘IMGUR_CLIENT_ID‘)

   if not client_id:

       raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")

   download_dir = setup_download_dir()

   links = [l for in get_links(client_id) if l.endswith(‘.jpg‘)]

   for link in links:

       download_link(download_dir, link)

   print(‘Took {}s‘.format(time() - ts))

 

if __name__ == ‘__main__‘:

   main()

注:为了测试方便,上面两段代码可以用如下代码替代演示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

# coding=utf-8

#测试utf-8编码

from time import sleep, time

import sys, threading

reload(sys)

sys.setdefaultencoding(‘utf-8‘)

def getNums(N):

    return xrange(N)

def processNum(num):

    num_add = num + 1

    sleep(1)

    print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)

if __name__ == "__main__":

    t1 = time()

    for in getNums(3):

        processNum(i)

    print "cost time is: {:.2f}s".format(time() - t1)

结果:


1

2

3

4

<_MainThread(MainThread, started 4436)>: 0 → 1

<_MainThread(MainThread, started 4436)>: 1 → 2

<_MainThread(MainThread, started 4436)>: 2 → 3

cost time is: 3.00s

在我的笔记本上,这个脚本花了19.4秒去下载91张图片。请注意这些数字在不同的网络上也会有所不同。19.4秒并不是非常的长,但是如果我们要下载更多的图片怎么办呢?或许是900张而不是90张。平均下载一张图片要0.2秒,900张的话大概需要3分钟。那么9000张图片将会花掉30分钟。好消息是使用了并发或者并行后,我们可以将这个速度显著地提高。

接下来的代码示例将只会显示导入特有模块和新模块的import语句。所有相关的Python脚本都可以在这方便地找到this GitHub repository

2、使用线程

线程是最出名的实现并发和并行的方式之一。操作系统一般提供了线程的特性。线程比进程要小,而且共享同一块内存空间。

在这里,我们将写一个替代“single.py”的新模块。它将创建一个有八个线程的池,加上主线程的话总共就是九个线程。之所以是八个线程,是因为我的电脑有8个CPU内核,而一个工作线程对应一个内核看起来还不错。在实践中,线程的数量是仔细考究的,需要考虑到其他的因素,比如在同一台机器上跑的的其他应用和服务。

下面的脚本几乎跟之前的一样,除了我们现在有个新的类,DownloadWorker,一个Thread类的子类。运行无限循环的run方法已经被重写。在每次迭代时,它调用“self.queue.get()”试图从一个线程安全的队列里获取一个URL。它将会一直堵塞,直到队列中出现一个要处理元素。一旦工作线程从队列中得到一个元素,它将会调用之前脚本中用来下载图片到目录中所用到的“download_link”方法。下载完成之后,工作线程向队列发送任务完成的信号。这非常重要,因为队列一直在跟踪队列中的任务数。如果工作线程没有发出任务完成的信号,“queue.join()”的调用将会令整个主线程都在阻塞状态。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

from queue import Queue

from threading import Thread

 

class DownloadWorker(Thread):

   def __init__(self, queue):

       Thread.__init__(self)

       self.queue = queue

 

   def run(self):

       while True:

           # Get the work from the queue and expand the tuple

           # 从队列中获取任务并扩展tuple

           directory, link = self.queue.get()

           download_link(directory, link)

           self.queue.task_done()

 

def main():

   ts = time()

   client_id = os.getenv(‘IMGUR_CLIENT_ID‘)

   if not client_id:

       raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")

   download_dir = setup_download_dir()

   links = [l for in get_links(client_id) if l.endswith(‘.jpg‘)]

   # Create a queue to communicate with the worker threads

   queue = Queue()

   # Create 8 worker threads

   # 创建八个工作线程

   for in range(8):

       worker = DownloadWorker(queue)

       # Setting daemon to True will let the main thread exit even though the workers are blocking

       # 将daemon设置为True将会使主线程退出,即使worker都阻塞了

       worker.daemon = True

       worker.start()

   # Put the tasks into the queue as a tuple

   # 将任务以tuple的形式放入队列中

   for link in links:

       logger.info(‘Queueing {}‘.format(link))

       queue.put((download_dir, link))

   # Causes the main thread to wait for the queue to finish processing all the tasks

   # 让主线程等待队列完成所有的任务

   queue.join()

   print(‘Took {}‘.format(time() - ts))

注:为了测试方便,上面的代码可以用如下代码替代演示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

# coding=utf-8

#测试utf-8编码

from Queue import Queue

from threading import Thread

from single import *

import sys

reload(sys)

sys.setdefaultencoding(‘utf-8‘)

class ProcessWorker(Thread):

    def __init__(self, queue):

        Thread.__init__(self)

        self.queue = queue

    def run(self):

        while True:

            # Get the work from the queue

            num = self.queue.get()

            processNum(num)

            self.queue.task_done()

def main():

    ts = time()

    nums = getNums(4)

    # Create a queue to communicate with the worker threads

    queue = Queue()

    # Create 4 worker threads

    # 创建四个工作线程

    for in range(4):

        worker = ProcessWorker(queue)

        # Setting daemon to True will let the main thread exit even though the workers are blocking

        # 将daemon设置为True将会使主线程退出,即使worker都阻塞了

        worker.daemon = True

        worker.start()

    # Put the tasks into the queue

    for num in nums:

        queue.put(num)

    # Causes the main thread to wait for the queue to finish processing all the tasks

    # 让主线程等待队列完成所有的任务

    queue.join()

    print("cost time is: {:.2f}s".format(time() - ts))

if __name__ == "__main__":

    main()

结果:


1

2

3

4

5

<ProcessWorker(Thread-4, started daemon 3900)>: 3 → 4<ProcessWorker(Thread-1, started daemon 3436)>: 2 → 3<ProcessWorker(Thread-3, started daemon 4576)>: 1 → 2

 

<ProcessWorker(Thread-2, started daemon 396)>: 0 → 1

cost time is: 1.01s

在同一个机器上运行这个脚本,下载时间变成了4.1秒!即比之前的例子快4.7倍。虽然这快了很多,但还是要提一下,由于GIL的缘故,在这个进程中同一时间只有一个线程在运行。因此,这段代码是并发的但不是并行的。而它仍然变快的原因是这是一个IO密集型的任务。进程下载图片时根本毫不费力,而主要的时间都花在了等待网络上。这就是为什么线程可以提供很大的速度提升。每当线程中的一个准备工作时,进程可以不断转换线程。使用Python或其他有GIL的解释型语言中的线程模块实际上会降低性能。如果你的代码执行的是CPU密集型的任务,例如解压gzip文件,使用线程模块将会导致执行时间变长。对于CPU密集型任务和真正的并行执行,我们可以使用多进程(multiprocessing)模块。

官方的Python实现——CPython——带有GIL,但不是所有的Python实现都是这样的。比如,IronPython,使用.NET框架实现的Python就没有GIL,基于Java实现的Jython也同样没有。你可以点这查看现有的Python实现。

3、生成多进程

多进程模块比线程模块更易使用,因为我们不需要像线程示例那样新增一个类。我们唯一需要做的改变在主函数中。

为了使用多进程,我们得建立一个多进程池。通过它提供的map方法,我们把URL列表传给池,然后8个新进程就会生成,它们将并行地去下载图片。这就是真正的并行,不过这是有代价的。整个脚本的内存将会被拷贝到各个子进程中。在我们的例子中这不算什么,但是在大型程序中它很容易导致严重的问题。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

from functools import partial

from multiprocessing.pool import Pool

 

def main():

   ts = time()

   client_id = os.getenv(‘IMGUR_CLIENT_ID‘)

   if not client_id:

       raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")

   download_dir = setup_download_dir()

   links = [l for in get_links(client_id) if l.endswith(‘.jpg‘)]

   download = partial(download_link, download_dir)

   with Pool(8) as p:

       p.map(download, links)

   print(‘Took {}s‘.format(time() - ts))

注:为了测试方便,上面的代码可以用如下代码替代演示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# coding=utf-8

#测试utf-8编码

from functools import partial

from multiprocessing.pool import Pool

from single import *

from time import time

import sys

reload(sys)

sys.setdefaultencoding(‘utf-8‘)

def main():

    ts = time()

    nums = getNums(4)

    = Pool(4)

    p.map(processNum, nums)

    print("cost time is: {:.2f}s".format(time() - ts))

if __name__ == "__main__":

    main()

结果:


1

2

3

4

5

<_MainThread(MainThread, started 6188)>: 0 → 1

<_MainThread(MainThread, started 3584)>: 1 → 2

<_MainThread(MainThread, started 2572)>: 3 → 4<_MainThread(MainThread, started 4692)>: 2 → 3

cost time is: 1.21s

4、分布式任务

你已经知道了线程和多进程模块可以给你自己的电脑跑脚本时提供很大的帮助,那么在你想要在不同的机器上执行任务,或者在你需要扩大规模而超过一台机器的的能力范围时,你该怎么办呢?一个很好的使用案例是网络应用的长时间后台任务。如果你有一些很耗时的任务,你不会希望在同一台机器上占用一些其他的应用代码所需要的子进程或线程。这将会使你的应用的性能下降,影响到你的用户们。如果能在另外一台甚至很多台其他的机器上跑这些任务就好了。

Python库RQ非常适用于这类任务。它是一个简单却很强大的库。首先将一个函数和它的参数放入队列中。它将函数调用的表示序列化(pickle),然后将这些表示添加到一个Redis列表中。任务进入队列只是第一步,什么都还没有做。我们至少还需要一个能去监听任务队列的worker(工作线程)。

第一步是在你的电脑上安装和使用Redis服务器,或是拥有一台能正常的使用的Redis服务器的使用权。接着,对于现有的代码只需要一些小小的改动。先创建一个RQ队列的实例并通过redis-py 库传给一台Redis服务器。然后,我们执行“q.enqueue(download_link, download_dir, link)”,而不只是调用“download_link” 。enqueue方法的第一个参数是一个函数,当任务真正执行时,其他的参数或关键字参数将会传给该函数。

最后一步是启动一些worker。RQ提供了方便的脚本,可以在默认队列上运行起worker。只要在终端窗口中执行“rqworker”,就可以开始监听默认队列了。请确认你当前的工作目录与脚本所在的是同一个。如果你想监听别的队列,你可以执行“rqworker queue_name”,然后将会开始执行名为queue_name的队列。RQ的一个很好的点就是,只要你可以连接到Redis,你就可以在任意数量上的机器上跑起任意数量的worker;因此,它可以让你的应用扩展性得到提升。下面是RQ版本的代码:


1

2

3

4

5

6

7

8

9

10

11

12

from redis import Redis

from rq import Queue

 

def main():

   client_id = os.getenv(‘IMGUR_CLIENT_ID‘)

   if not client_id:

       raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")

   download_dir = setup_download_dir()

   links = [l for in get_links(client_id) if l.endswith(‘.jpg‘)]

   = Queue(connection=Redis(host=‘localhost‘, port=6379))

   for link in links:

       q.enqueue(download_link, download_dir, link)

然而RQ并不是Python任务队列的唯一解决方案。RQ确实易用并且能在简单的案例中起到很大的作用,但是如果有更高级的需求,我们可以使用其他的解决方案(例如 Celery)。

5、总结

如果你的代码是IO密集型的,线程和多进程可以帮到你。多进程比线程更易用,但是消耗更多的内存。如果你的代码是CPU密集型的,多进程就明显是更好的选择——特别是所使用的机器是多核或多CPU的。对于网络应用,在你需要扩展到多台机器上执行任务,RQ是更好的选择。

6、注:关于并发、并行区别与联系

  • 并发是指,程序在运行的过程中存在多于一个的执行上下文。这些执行上下文一般对应着不同的调用栈。

在单处理器上,并发程序虽然有多个上下文运行环境,但某一个时刻只有一个任务在运行。

但在多处理器上,因为有了多个执行单元,就可以同时有数个任务在跑。

  • 这种物理上同一时刻有多个任务同时运行的方式就是并行。

和并发相比,并行更加强调多个任务同时在运行。

而且并行还有一个层次问题,比如是指令间的并行还是任务间的并行。

7、Refer:

[1] Python Multithreading Tutorial: Concurrency and Parallelism

http://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python

[2] 串行(Sequential)、并发(Concurrent)、并行(parallel)与分布式(distributed)

时间: 2024-10-16 22:42:27

Python 多线程教程:并发与并行的相关文章

Python多线程,多进程,并行,并发,异步编程

Python并发与并行的新手指南:http://python.jobbole.com/81260/ Python 中的多线程,多进程,并发,并行,同步,通信:https://blog.csdn.net/timemachine119/article/details/54091323 python进阶笔记 thread 和 threading模块学习:https://www.cnblogs.com/forward-wang/p/5970640.html Python 中的多线程,多进程,并发,并行,

python+Appium自动化:python多线程多并发启动appium服务

Python启动Appium 服务 使用Dos命令或者bat批处理来手动启动appium服务,启动效率低下.如何将启动Appium服务也实现自动化呢? 这里需要使用subprocess模块,该模块可以创建新的进程,并且连接到进程的输入.输出.错误等管道信息,并且可以获取进程的返回值.subprocess模块官方文档 场景 使用Python启动2台appium服务,端口配置如下: Appium服务器端口:4723,bp端口为4724 Appium服务器端口:4725,bp端口为4726 说明:bp

Python并发与并行的新手指南

点这里 在批评Python的讨论中,常常说起Python多线程是多么的难用.还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行.因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行.必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情.如果你还没看过的话,我建议你看看Eqbal Quran的文章<

百万年薪python之路 -- 并发编程之 多线程 二

1. 死锁现象与递归锁 进程也有死锁与递归锁,进程的死锁和递归锁与线程的死锁递归锁同理. 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因为争夺资源而造成的一种互相等待的现象,在无外力的作用下,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在相互等待的进程称为死锁进程 # 多个线程多个锁可能会产生死锁 from threading import Thread from threading import Lock import time lock_A = Lock

并发和并行,异步与多线程区别

1.并发和并行的区别 可由上图形象指出两者的区别: 1)定义: 并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. 并行:在操作系统中,一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的. 来个比喻:并发和并行的区别就是一个人同时吃三个馒头和三个人同时吃三个馒头: 在单CPU系统中,系统调度在某一时刻只能让一个线程运行,虽然这种调试机制有多种形式(大多数是时间片轮巡为主

Java多线程-并发和并行

 1.并发和并行的区别 可由上图形象指出两者的区别: 1)定义: 并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. 并行:在操作系统中,一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的. 来个比喻:并发和并行的区别就是一个人同时吃三个馒头和三个人同时吃三个馒头: 在单CPU系统中,系统调度在某一时刻只能让一个线程运行,虽然这种调试机制有多种形式(大多数是时间片轮

并发 互斥 并行 同步 异步 多线程的区别

并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行.其中两种并发关系分别是同步和互斥 互斥:进程间相互排斥的使用临界资源的现象,就叫互斥. 临界资源(critical resource):一次只能供一个进程使用的资源. 如:硬件有打印机等,软件有变量,磁盘文件(写入的时候).临界区(critical section):把进程中访问临界资源的那段代码成为临界区.为了实现临界资源的互斥访问,只要做到进程互斥地进去自己的临界区,便可以实现

多线程、进程、并发、并行、同步、异步、伪并发、真并发

进程.线程 1.进程 一个程序,可以独立运行的一段程序.系统对它进行资源分配和调度. 2.线程 进程的基本单位,对它进行cpu分配和调度.只拥有一点在运行中必不可少的资源(寄存器,栈,程序计数器) 3.线程与进程的联系与区别 联系: (1)线程是指进程内的一个执行单元,一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程(通常说的主线程). 但是存在 DOS 这样的单进程(而且无线程概念)系统. (2)资源分配给进程,同一进程的所有线程共享该进程的所有资源,线程自己基本上不拥有系

并发、并行与C++多线程——基础一

1.什么是并发? 并发指的是两个或多个独立的活动在同一时段内发生.生活中并发的例子并不少,例如在跑步的时候你可能同时在听音乐:在看电脑显示器的同时你的手指在敲击键盘.这时我们称我们大脑并发地处理这些事件,只不过我们大脑的处理是有次重点的:有时候你会更关注你呼吸的频率,而有时候你更多地被美妙的音乐旋律所吸引.这时我们可以说大脑是一种并发设计的结构.这种次重点在计算机程序设计中,体现为某一个时刻只能处理一个操作. 2.什么是并行? 与并发相近的另一个概念是并行.它们两者存在很大的差别.并行就是同时执