Python中的多进程与多线程/分布式该如何使用

在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行。必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情。如果你还没看过的话,我建议你看看Eqbal Quran的文章《Ruby中的并发和并行》。

在本文中,我们将会写一个小的Python脚本,用于下载Imgur上最热门的图片。我们将会从一个按顺序下载图片的版本开始做起,即一个一个地下载。在那之前,你得注册一个Imgur上的应用。如果你还没有Imgur账户,请先注册一个。

本文中的脚本在Python3.4.2中测试通过。稍微改一下,应该也能在Python2中运行——urllib是两个版本中区别最大的部分。
开始动手

让我们从创建一个叫“download.py”的Python模块开始。这个文件包含了获取图片列表以及下载这些图片所需的所有函数。我们将这些功能分成三个单独的函数:

get_links
download_link
setup_download_dir

第三个函数,“setup_download_dir”,用于创建下载的目标目录(如果不存在的话)。

Imgur的API要求HTTP请求能支持带有client ID的“Authorization”头部。你可以从你注册的Imgur应用的面板上找到这个client ID,而响应会以JSON进行编码。我们可以使用Python的标准JSON库去解码。下载图片更简单,你只需要根据它们的URL获取图片,然后写入到一个文件即可。

代码如下:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

def get_links(client_id):
  headers = {‘Authorization‘: ‘Client-ID {}‘.format(client_id)}
  req = Request(‘https://api.imgur.com/3/gallery/‘, headers=headers, method=‘GET‘)
  with urlopen(req) as resp:
    data = json.loads(resp.readall().decode(‘utf-8‘))
  return map(lambda item: item[‘link‘], data[‘data‘])

def download_link(directory, link):
  logger.info(‘Downloading %s‘, link)
  download_path = directory / os.path.basename(link)
  with urlopen(link) as image, download_path.open(‘wb‘) as f:
    f.write(image.readall())

def setup_download_dir():
  download_dir = Path(‘images‘)
  if not download_dir.exists():
    download_dir.mkdir()
  return download_dir

接下来,你需要写一个模块,利用这些函数去逐个下载图片。我们给它命名为“single.py”。它包含了我们最原始版本的Imgur图片下载器的主要函数。这个模块将会通过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调用“setup_download_dir”去创建下载目录。最后,使用get_links函数去获取图片的列表,过滤掉所有的GIF和专辑URL,然后用“download_link”去将图片下载并保存在磁盘中。下面是“single.py”的代码:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.DEBUG, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s‘)
logging.getLogger(‘requests‘).setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
  ts = time()
  client_id = os.getenv(‘IMGUR_CLIENT_ID‘)
  if not client_id:
    raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith(‘.jpg‘)]
  for link in links:
    download_link(download_dir, link)
  print(‘Took {}s‘.format(time() - ts))

if __name__ == ‘__main__‘:
  main()

在我的笔记本上,这个脚本花了19.4秒去下载91张图片。请注意这些数字在不同的网络上也会有所不同。19.4秒并不是非常的长,但是如果我们要下载更多的图片怎么办呢?或许是900张而不是90张。平均下载一张图片要0.2秒,900张的话大概需要3分钟。那么9000张图片将会花掉30分钟。好消息是使用了并发或者并行后,我们可以将这个速度显著地提高。

接下来的代码示例将只会显示导入特有模块和新模块的import语句。所有相关的Python脚本都可以在这方便地找到this GitHub repository
使用线程

线程是最出名的实现并发和并行的方式之一。操作系统一般提供了线程的特性。线程比进程要小,而且共享同一块内存空间。

在这里,我们将写一个替代“single.py”的新模块。它将创建一个有八个线程的池,加上主线程的话总共就是九个线程。之所以是八个线程,是因为我的电脑有8个CPU内核,而一个工作线程对应一个内核看起来还不错。在实践中,线程的数量是仔细考究的,需要考虑到其他的因素,比如在同一台机器上跑的的其他应用和服务。

下面的脚本几乎跟之前的一样,除了我们现在有个新的类,DownloadWorker,一个Thread类的子类。运行无限循环的run方法已经被重写。在每次迭代时,它调用“self.queue.get()”试图从一个线程安全的队列里获取一个URL。它将会一直堵塞,直到队列中出现一个要处理元素。一旦工作线程从队列中得到一个元素,它将会调用之前脚本中用来下载图片到目录中所用到的“download_link”方法。下载完成之后,工作线程向队列发送任务完成的信号。这非常重要,因为队列一直在跟踪队列中的任务数。如果工作线程没有发出任务完成的信号,“queue.join()”的调用将会令整个主线程都在阻塞状态。

from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
  def __init__(self, queue):
    Thread.__init__(self)
    self.queue = queue

  def run(self):
    while True:
      # Get the work from the queue and expand the tuple
      # 从队列中获取任务并扩展tuple
      directory, link = self.queue.get()
      download_link(directory, link)
      self.queue.task_done()

def main():
  ts = time()
  client_id = os.getenv(‘IMGUR_CLIENT_ID‘)
  if not client_id:
    raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith(‘.jpg‘)]
  # Create a queue to communicate with the worker threads
  queue = Queue()
  # Create 8 worker threads
  # 创建八个工作线程
  for x in range(8):
    worker = DownloadWorker(queue)
    # Setting daemon to True will let the main thread exit even though the workers are blocking
    # 将daemon设置为True将会使主线程退出,即使worker都阻塞了
    worker.daemon = True
    worker.start()
  # Put the tasks into the queue as a tuple
  # 将任务以tuple的形式放入队列中
  for link in links:
    logger.info(‘Queueing {}‘.format(link))
    queue.put((download_dir, link))
  # Causes the main thread to wait for the queue to finish processing all the tasks
  # 让主线程等待队列完成所有的任务
  queue.join()
  print(‘Took {}‘.format(time() - ts))

在同一个机器上运行这个脚本,下载时间变成了4.1秒!即比之前的例子快4.7倍。虽然这快了很多,但还是要提一下,由于GIL的缘故,在这个进程中同一时间只有一个线程在运行。因此,这段代码是并发的但不是并行的。而它仍然变快的原因是这是一个IO密集型的任务。进程下载图片时根本毫不费力,而主要的时间都花在了等待网络上。这就是为什么线程可以提供很大的速度提升。每当线程中的一个准备工作时,进程可以不断转换线程。使用Python或其他有GIL的解释型语言中的线程模块实际上会降低性能。如果你的代码执行的是CPU密集型的任务,例如解压gzip文件,使用线程模块将会导致执行时间变长。对于CPU密集型任务和真正的并行执行,我们可以使用多进程(multiprocessing)模块。

官方的Python实现——CPython——带有GIL,但不是所有的Python实现都是这样的。比如,IronPython,使用.NET框架实现的Python就没有GIL,基于Java实现的Jython也同样没有。你可以点这查看现有的Python实现。
生成多进程

多进程模块比线程模块更易使用,因为我们不需要像线程示例那样新增一个类。我们唯一需要做的改变在主函数中。

为了使用多进程,我们得建立一个多进程池。通过它提供的map方法,我们把URL列表传给池,然后8个新进程就会生成,它们将并行地去下载图片。这就是真正的并行,不过这是有代价的。整个脚本的内存将会被拷贝到各个子进程中。在我们的例子中这不算什么,但是在大型程序中它很容易导致严重的问题。

from functools import partial
from multiprocessing.pool import Pool

def main():
  ts = time()
  client_id = os.getenv(‘IMGUR_CLIENT_ID‘)
  if not client_id:
    raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith(‘.jpg‘)]
  download = partial(download_link, download_dir)
  with Pool(8) as p:
    p.map(download, links)
  print(‘Took {}s‘.format(time() - ts))

分布式任务

你已经知道了线程和多进程模块可以给你自己的电脑跑脚本时提供很大的帮助,那么在你想要在不同的机器上执行任务,或者在你需要扩大规模而超过一台机器的的能力范围时,你该怎么办呢?一个很好的使用案例是网络应用的长时间后台任务。如果你有一些很耗时的任务,你不会希望在同一台机器上占用一些其他的应用代码所需要的子进程或线程。这将会使你的应用的性能下降,影响到你的用户们。如果能在另外一台甚至很多台其他的机器上跑这些任务就好了。

Python库RQ非常适用于这类任务。它是一个简单却很强大的库。首先将一个函数和它的参数放入队列中。它将函数调用的表示序列化(pickle),然后将这些表示添加到一个Redis列表中。任务进入队列只是第一步,什么都还没有做。我们至少还需要一个能去监听任务队列的worker(工作线程)。

第一步是在你的电脑上安装和使用Redis服务器,或是拥有一台能正常的使用的Redis服务器的使用权。接着,对于现有的代码只需要一些小小的改动。先创建一个RQ队列的实例并通过redis-py 库传给一台Redis服务器。然后,我们执行“q.enqueue(download_link, download_dir, link)”,而不只是调用“download_link” 。enqueue方法的第一个参数是一个函数,当任务真正执行时,其他的参数或关键字参数将会传给该函数。

最后一步是启动一些worker。RQ提供了方便的脚本,可以在默认队列上运行起worker。只要在终端窗口中执行“rqworker”,就可以开始监听默认队列了。请确认你当前的工作目录与脚本所在的是同一个。如果你想监听别的队列,你可以执行“rqworker queue_name”,然后将会开始执行名为queue_name的队列。RQ的一个很好的点就是,只要你可以连接到Redis,你就可以在任意数量上的机器上跑起任意数量的worker;因此,它可以让你的应用扩展性得到提升。下面是RQ版本的代码:

from redis import Redis
from rq import Queue

def main():
  client_id = os.getenv(‘IMGUR_CLIENT_ID‘)
  if not client_id:
    raise Exception("Couldn‘t find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith(‘.jpg‘)]
  q = Queue(connection=Redis(host=‘localhost‘, port=6379))
  for link in links:
    q.enqueue(download_link, download_dir, link)

然而RQ并不是Python任务队列的唯一解决方案。RQ确实易用并且能在简单的案例中起到很大的作用,但是如果有更高级的需求,我们可以使用其他的解决方案(例如 Celery)。
总结

如果你的代码是IO密集型的,线程和多进程可以帮到你。多进程比线程更易用,但是消耗更多的内存。如果你的代码是CPU密集型的,多进程就明显是更好的选择——特别是所使用的机器是多核或多CPU的。对于网络应用,在你需要扩展到多台机器上执行任务,RQ是更好的选择。

时间: 2024-10-10 23:12:43

Python中的多进程与多线程/分布式该如何使用的相关文章

Python中的多进程与多线程(二)

在上一章中,学习了Python多进程编程的一些基本方法:使用跨平台多进程模块multiprocessing提供的Process.Pool.Queue.Lock.Pipe等类,实现子进程创建.进程池(批量创建子进程并管理子进程数量上限)以及进程间通信.这一章学习下Python下的多线程编程方法. 一.threading 线程是操作系统执行任务的最小单元.Python标准库中提供了threading模块,对多线程编程提供了很便捷的支持. 下面是使用threading实现多线程的代码: 1 #!/us

python并发之多进程、多线程、协程和异步

一.多线程 二.协程(又称微线程,纤程) 协程,与线程的抢占式调度不同,它是协作式调度.协程在python中可以由generator来实现. 首先要对生成器和yield有一个扎实的理解. 调用一个普通的python函数,一般是从函数的第一行代码开始执行,结束于return语句.异常或者函数执行(也可以认为是隐式地返回了None). 一旦函数将控制权交还给调用者,就意味着全部结束.而有时可以创建能产生一个序列的函数,来“保存自己的工作”,这就是生成器(使用了yield关键字的函数). 能够“产生一

Python中的多进程

由于cPython的gill,多线程未必是CPU密集型程序的好的选择. 多线程可以完全独立的进程环境中运行程序,可以充分利用多处理器. 但是进程本身的隔离带来的数据不共享也是一种问题,线程比进程轻量级. 1.Multiprocessing import multiprocessing import datetime def calc(i):     sum = 0     for _ in range(10000000):         sum += 1     print(i,sum) if

进程和线程,线程安全,python如何实现多进程,多线程

进程和线程的区别 进程是对运行时程序的封装,是系统资源调度和分配的基本单位 线程是进程的子任务,cpu调度和分配的基本单位,实现进程内并发. 一个进程可以包含多个线程,线程依赖进程存在,并共享进程内存 什么是线程安全 一个线程的修改被另一个线程的修改覆盖掉. python中哪些操作是线程安全的 一个操作可以在多线程环境中使用,并且获得正确的结果. 线程安全的操作线程是顺序执行的而不是并发执行的. 一般涉及到写操作需要考虑如何让多个线程安全访问数据. 线程同步的方式 互斥量(锁): 通过互斥机制防

python中的多进程处理

转载于:http://blog.csdn.net/jj_liuxin/article/details/3564365 帮助文档见https://docs.python.org/2.7/library/multiprocessing.html 众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的:但在python 3.0中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing. 注意:这个模块的某些函数需要操作系统的支持,例如,mu

122 Python程序中的多进程和多线程

目录 一.什么是进程池或线程池 二.理解同步.异步. 三.multiprocess.Pool模块实现进程池 3.1.1 Pool使用方法 3.1.1 代码实例--multiprocess.Pool 四.Python标准模块--concurrent.futures实现进程池和线程池 4.1 介绍 4.2 基本方法 4.3 代码实例--ProcessPoolExecutor 方式1: 方式2: 方式3 4.4 代码实例--ThreadPoolExecutor 方式1: 方式2: 方式3: 一.什么是

服务器开发中的多进程,多线程及多协程

服务器开发中,为了充分利用多核甚至多个cpu,或者是简化逻辑编写的难度,会应用多进程(比如一个进程负责一种逻辑)多线程(将不同的用户分配到不同的进程)或者协程(不同的用户分配不同的协程,在需要时切换到其他协程),并且往往同时利用这些技术比如多进程多线程. 一个经典的服务器框架可以说如下的框架: 而这些服务器进程之间协同配合,为用户提供服务,其中中心服务器提供集群的调度工作,而逻辑服可以是逻辑服务器1提供登录服务.逻辑服务器2提供购买服务:也可以是2个服务器提供相同服务,然后视负载用户数将不同用户

python中,用Redis构建分布式锁

分布式锁 在实际应用场景中,我们可能有多个worker,可能在一台机器,也可能分布在不同的机器,但只有一个worker可以同时持有一把锁,这个时候我们就需要用到分布式锁了. 这里推荐python的实现库,Redlock-py (Python 实现). 正常情况下,worker获得锁后,处理自己的任务,完成后自动释放持有的锁,是不是感觉有点熟悉,很容易想到我们的上下文管理器,这里我们简单的用装饰器实现 with...as... 语法. 安装Redlock-py $ pip install redl

在Python中使用多进程快速处理数据

转自:https://blog.csdn.net/bryan__/article/details/78786648 数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并.应用场景:多进程爬虫,类mapreduce任务.缺点是子进程会拷贝父进程所有状态,内存浪费严重. import math from multiprocessing import Pool def run(data, index, size): # d