Python并发编程-线程

  Python作为一种解释型语言,由于使用了全局解释锁(GIL)的原因,其代码不能同时在多核CPU上并发的运行。这也导致在Python中使用多线程编程并不能实现并发,我们得使用其他的方法在Python中实现并发编程。

一、全局解释锁(GIL)

  Python中不能通过使用多线程实现并发编程主要是因为全局解释锁的机制,所以首先解释一下全局解释锁的概念。

 首先,我们知道C++和Java是编译型语言,而Python则是一种解释型语言。对于Python程序来说,它是直接被输入到解释器中直接运行的。解释器在程序执行之前对其并不了解;它所知道的只是Python的规则,以及在执行过程中怎样去动态的应用这些规则。它也有一些优化,但是这基本上只是另一个级别的优化。由于解释器没法很好的对程序进行推导,Python的大部分优化其实是解释器自身的优化。更快的解释器自然意味着程序的运行也能“免费”的更快。也就是说,解释器优化后,Python程序不用做修改就可以享受优化后的好处。

为了利用多核系统,Python必须支持多线程运行。但作为解释型语言,Python的解释器需要做到既安全又高效。解释器要注意避免在不同的线程操作内部共享的数据,同时还要保证在管理用户线程时保证总是有最大化的计算资源。为了保证不同线程同时访问数据时的安全性,Python使用了全局解释器锁(GIL)的机制。从名字上我们很容易明白,它是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但它也意味着:对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。即:只有获得了全局解释器锁的线程才能操作Python对象或者调用Python/C API函数。

所以,在Python中”不要使用多线程,请使用多进程”。具体来说,如果你的代码是IO密集型的,使用多线程或者多进程都是可以的,多进程比线程更易用,但是会消耗更多的内存;如果你的代码是CPU密集型的,多进程(multiprocessing模块)就明显是更好的选择——特别是所使用的机器是多核或多CPU的时候。

另外,Python的官方实现CPython带有GIL,但并不是所有的Python实现版本都是这样的。IronPython,Jython,还有使用.NET框架实现的Python就没有GIL。所以如果你不能忍受GIL,也可以尝试用一下其他实现版本的Python。

  

  如果是一个计算型的任务,GIL就会让多线程变慢。我们举个计算斐波那契数列的例子:

import time
import threading

def text(name):
    def profile(func):
        def wrapper(*args,**kwargs):
            start = time.time()
            res = func(*args,**kwargs)
            end = time.time()
            print(‘{} cost:{}‘.format(name,end-start))
            return res
        return wrapper
    return profile

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

@text(‘nothread‘)
def nothread():
    fib(35)
    fib(35)

@text(‘hasthread‘)
def hasthread():
    for i in range(2):
        t = threading.Thread(target=fib,args=(35,))
        t.start()
    main_thread = threading.current_thread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()

nothread()
hasthread()

##输出结果###
nothread cost:6.141353607177734
hasthread cost:6.15336275100708

  这种情况还不如不用多线程!

  GIL是必须的,这是Python设计的问题:Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/O操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高。

  那是不是由于GIL的存在,多线程库就是个「鸡肋」呢?当然不是。事实上我们平时会接触非常多的和网络通信或者数据输入/输出相关的程序,比如网络爬虫、文本处理等等。这时候由于网络情况和I/O的性能的限制,Python解释器会等待读写数据的函数调用返回,这个时候就可以利用多线程库提高并发效率了。

2.同步机制

  A. Semaphore(信号量)

  在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是1)的限制。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。

import time
from random import random
from threading import Thread,Semaphore,current_thread,enumerate

sema = Semaphore(3)

def foo(tid):
    with sema:
        print(‘{} acquire sema‘.format(tid))
        wt = random() * 2

        time.sleep(wt)
    print(‘{} release sema‘.format(tid))

for i in range(5):
    t = Thread(target=foo,args=(i,))
    t.start()

main_thread = current_thread()
for t in enumerate():
    if t is main_thread:
        continue
    t.join()

####输出结果#####
0 acquire sema
1 acquire sema
2 acquire sema
0 release sema
3 acquire sema
1 release sema
4 acquire sema
2 release sema
3 release sema
4 release sema

  B. Lock(互斥锁)

  Lock也可以叫做互斥锁,其实相当于信号量为1。我们先看一个不加锁的例子:

import time
import threading

value = 0

def getlock():
    global value
    new = value + 1
    time.sleep(0.001)  # 让线程有机会切换
    value = new

for i in range(100):
    t = threading.Thread(target=getlock)
    t.start()

main_thread = threading.current_thread()

for t in threading.enumerate():
    if t == main_thread:
        continue
    t.join()

print(value)

####输出结果#####
不确定(刷新值会发生改变)

  现在,我们来看看加锁之后的情况:

import time
import threading

value = 0
lock = threading.Lock()

def getlock():
    global value
    with lock:
        new = value + 1
        time.sleep(0.001)  # 让线程有机会切换
        value = new

for i in range(100):
    t = threading.Thread(target=getlock)
    t.start()

main_thread = threading.current_thread()

for t in threading.enumerate():
    if t == main_thread:
        continue
    t.join()

print(value)

####输出结果为#############
100

  我们对value的自增加了锁,就可以保证了结果了。

3. RLock(递归锁)

  先来说说死锁,所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()

    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

产生死锁

  解决方案:

import threading
import time

mutex = threading.RLock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutex.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutex.release()
        mutex.release()

    def fun2(self):
        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutex.release()

        mutex.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

  递归锁内部维护了一个计数器,当有线程拿到了Lock以后,这个计数器会自动加1,只要这计数器的值大于0,那么其他线程就不能抢到改锁,这就保证了,在同一时刻,仅有一个线程使用该锁,从而避免了死锁的方法。关于递归锁内部实现,有兴趣的可以看看源码。

4. Condition(条件)

  一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者/消费者」模型:

import time
import threading

def consumer(cond):
    t = threading.current_thread()
    with cond:
        cond.wait()  # 创建了一个锁,等待producer解锁
        print(‘{}: Resource is available to consumer‘.format(t.name))

def producer(cond):
    t = threading.current_thread()
    with cond:
        print(‘{}:Making resource available‘.format(t.name))
        cond.notifyAll()  # 释放锁,唤醒消费者

condition = threading.Condition()

c1 = threading.Thread(name=‘c1‘,target=consumer,args=(condition,))
p = threading.Thread(name=‘p‘,target=producer,args=(condition,))
c2 = threading.Thread(name=‘c2‘,target=consumer,args=(condition,))

c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()

5. Event

  一个线程发送/传递事件,另外的线程等待事件的触发。我们同样的用「生产者/消费者」模型的例子:

import time
import threading
from random import randint

TIMEOUT = 2

def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print(‘{} popped from list by {}‘.format(integer,t.name))
                event.clear()  # 重置状态
            except IndexError:
                pass

def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10,100)
        l.append(integer)
        print(‘{} append to list by {}‘.format(integer, t.name))
        event.set()
        time.sleep(1)

event = threading.Event()

l = []
threads = []

p = threading.Thread(name=‘producer1‘, target=producer, args=(event, l))
p.start()
threads.append(p)

for name in (‘consumer1‘,‘consumer2‘):
    t = threading.Thread(target=consumer, name=name, args=(event, l))
    t.start()
    threads.append(t)

for t in threads:
    t.join()
print(‘ending‘)

  可以看到事件被2个消费者比较平均的接收并处理了。如果使用了wait方法,线程就会等待我们设置事件,这也有助于保证任务的完成。

6. Queue

  队列在并发开发中最常用的。我们借助「生产者/消费者」模式来理解:生产者把生产的「消息」放入队列,消费者从这个队列中对去对应的消息执行。

大家主要关心如下4个方法就好了:

  1. put: 向队列中添加一个消息。
  2. get: 从队列中删除并返回一个消息。
  3. task_done: 当某一项任务完成时调用。
  4. join: 阻塞直到所有的项目都被处理完。
import time
import threading
import random
import queue

q = queue.Queue()

def double(n):
    return n*2

def producer():
    while 1:
        wt = random.randint(1,10)
        time.sleep(random.random())
        q.put((double, wt))

def consumer():
    while 1:
        task, arg = q.get()
        print(arg, task(arg))

        q.task_done()

for target in (producer, consumer):
    t = threading.Thread(target=target)
    t.start()

  Queue模块还自带了PriorityQueue(带有优先级)和LifoQueue(先进先出)2种特殊队列。我们这里展示下线程安全的优先级队列的用法,
  PriorityQueue要求我们put的数据的格式是(priority_number, data),我们看看下面的例子:

 

import time
import threading
from random import randint
import queue

q = queue.PriorityQueue()
def double(n):
    return n * 2

def producer():
    count = 0
    while 1:
        if count > 5:
            break
        prit = randint(0,100)
        print("put :{}".format(prit))
        q.put((prit, double, prit))  # (优先级,函数,参数)
        count += 1

def consumer():
    while 1:
        if q.empty():
            break
        pri,task,arg = q.get()
        print(‘[PRI:{}] {} * 2 = {}‘.format(pri,arg,task(arg)))
        q.task_done()
        time.sleep(0.1)

t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

7.线程池

  面向对象开发中,大家知道创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。无节制的创建和销毁线程是一种极大的浪费。那我们可不可以把执行完任务的线程不销毁而重复利用呢?仿佛就是把这些线程放进一个池子,一方面我们可以控制同时工作的线程数量,一方面也避免了创建和销毁产生的开销。

import time
import threading
from random import random
import queue

def double(n):
    return n * 2

class Worker(threading.Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self._q = queue
        self.daemon = True
        self.start()

    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            try:
                print(‘USE:{}‘.format(self.name))
                print(f(*args, **kwargs))
            except Exception as e:
                print(e)
            self._q.task_done()

class ThreadPool(object):
    def __init__(self, max_num=5):
        self._q = queue.Queue(max_num)
        for _ in range(max_num):
            Worker(self._q)  # create worker thread

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_compelete(self):
        self._q.join()

pool = ThreadPool()
for _ in range(8):
    wt = random()
    pool.add_task(double, wt)
    time.sleep(wt)

pool.wait_compelete()
时间: 2024-08-28 15:34:21

Python并发编程-线程的相关文章

Python并发编程(线程队列,协程,Greenlet,Gevent)

线程队列 线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢? queue队列 :使用import queue,用法与进程Queue一样 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads. class queue.Queue(maxsize=0) #先进先出 import queue #不需要通过

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

Python并发编程—线程

线程编程(Thread) 线程基本概念 1.什么是线程[1] 线程被称为轻量级的进程[2] 线程也可以使用计算机多核资源,是多任务编程方式[3] 线程是系统分配内核的最小单元[4] 线程可以理解为进程的分支任务 2.线程特征[1] 一个进程中可以包含多个线程[2] 线程也是一个运行行为,消耗计算机资源[3] 一个进程中的所有线程共享这个进程的资源[4] 多个线程之间的运行互不影响各自运行[5] 线程的创建和销毁消耗资源远小于进程[6] 各个线程也有自己的ID等特征 threading模块创建线程

Python并发编程—线程对象属性

线程对象属性 t.name 线程名称 t.setName() 设置线程名称 t.getName() 获取线程名称 t.is_alive() 查看线程是否在生命周期 t.daemon 设置主线程和分支线程的退出关系 t.setDaemon() 设置daemon属性值 t.isDaemon() 查看daemon属性值 daemon为True时主线程退出分支线程也退出.要在start前设置,通常不和join一起使用. 1 from threading import Thread 2 from time

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

python并发编程之多进程

python并发编程之多进程 一.什么是进程 进程:正在进行的一个过程或者一个任务,执行任务的是CPU. 原理:单核加多道技术 二.进程与程序的区别 进程是指程序的运行过程 需要强调的是:同一个程序执行两次是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,另一个可以播放武藤兰. 三.并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务. (1)并发

python-学习-python并发编程之多进程与多线程

一 multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.Python提供了multiprocessing.    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似.  multiprocessing模块的功能众多:支持子进程.通信和共享数据.执行不同形式的同步,

Python并发编程实例教程

有关Python中的并发编程实例,主要是对Threading模块的应用,文中自定义了一个Threading类库. 一.简介 我们将一个正在运行的程序称为进程.每个进程都有它自己的系统状态,包含内存状态.打开文件列表.追踪指令执行情况的程序指针以及一个保存局部变量的调用栈.通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程.在任何给定的时刻,一个程序只做一件事情. 一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或

python并发编程&amp;多线程(一)

本篇理论居多,实际操作见:  python并发编程&多线程(二) 一 什么是线程 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线 流水线的工作需要电源,电源就相当于cpu 所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位. 多线程(即多个控制线程)的概念