python中多线程

多线程

  • 什么是多线程
  • 开启线程的两种方式
  • 进程和线程的区别
  • Thread对象的其他属性和方法
  • 守护线程
  • 死锁现象与递归锁
  • 信号量、Event定时器
  • 线程Queue
  • 进程池和线程池

什么是多线程

在传统意义上,每个进程有一个地址空间,而且默认就会有一个控制线程。

线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于CPU),而一条流水线必须属于一个车间,一个车间的工作过程是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间至少要有一条流水线。

所以,进程只是用来把资源整合到一起,而线程才是CPU上的执行单位。

多线程(多个控制线程)的概念是:在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都公用一个车间的资源。

so:进程之间数据不共享,线程之间数据是共享的。

开启线程的两种方式

# 第一种
from threading import Thread
import time

def task(name):
    print(‘%s is running.‘%name)
    time.sleep(2)
    print(‘%s is done.‘%name)

if __name__ == ‘__main__‘:
    t = Thread(target=task,args=(‘子线程‘,))
    t.start()
    print(‘主线程‘)
# 运行结果
子线程 is running.
主线程
子线程 is done.

---------------------------------------------------
# 第二种
from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,name):
        super(MyThread, self).__init__()
        self.name = name
    def run(self):
        print(‘%s is running.‘%self.name)
        time.sleep(2)
        print(‘%s is done.‘%self.name)

if __name__ == ‘__main__‘:
    t = MyThread(‘子线程‘)
    t.start()
    print(‘主线程‘)
# 运行结果
子进程 is running.
主进程
子进程 is done.

进程和线程的区别

  • 开进程的开销远大于开线程
  • 同一进程内的多个线程共享该进程的地址空间
  • PID

验证开进程的开销远大于开线程

# 开启多进程
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess, self).__init__()
        self.name = name
    def run(self):
        print(‘%s is running.‘%self.name)
        time.sleep(2)
        print(‘%s is done.‘%self.name)

if __name__ == ‘__main__‘:
    p = MyProcess(‘子进程‘)
    p.start()
    print(‘主进程‘)
# 开启多线程
from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,name):
        super(MyThread, self).__init__()
        self.name = name
    def run(self):
        print(‘%s is running.‘%self.name)
        time.sleep(2)
        print(‘%s is done.‘%self.name)

if __name__ == ‘__main__‘:
    p = MyThread(‘子线程‘)
    p.start()
    print(‘主线程‘)

如果这两段代码你分别运行一遍,你就可以明显的看出来,在开启多进程的时候,首先打印的是主进程,因为子进程此时在申请内存地址,然后才会把子进程打印出来;

而在开启多线程的时候,会立马把这个子进程首先打印出来。

同一个进程内的多个进程共享该进程的地址空间

说白了就是线程之间数据共享,因为之前做过了进程之间数据不共享的实验,所以这次就写一个线程之间共享的代码:

from threading import Thread
from multiprocessing import Process
import time

n = 100

def task(name):
    global n
    n = 99
    print(‘[%s]内n的值为<%s>‘%(name,n))

if __name__ == ‘__main__‘:
    t = Thread(target=task,args=(‘子线程‘,))
    t.start()
    print(‘主线程n的值为%s‘%n)

# 运行结果为
[子线程]内n的值为<99>
主线程n的值为99

足以证明线程之间数据是共享的。

瞅一眼pid

# 查看主进程和子进程的pid
from multiprocessing import Process,current_process
import time

def task():
    print(‘子进程:‘,current_process().pid)
    time.sleep(2)

if __name__ == ‘__main__‘:
    p = Process(target=task,name=‘子进程‘)
    p.start()
    print(‘主进程:‘,current_process().pid)

# 运行结果为:
主进程: 7912
子进程: 12092
# 查看主线程和子线程的pid
from threading import Thread,current_thread
import time,os

def task():
    print(‘子线程:‘,os.getpid())
    time.sleep(2)

if __name__ == ‘__main__‘:
    t = Thread(target=task,name=‘Thread子线程‘)
    t.start()
    print(‘主线程:‘,os.getpid())

# 运行结果为:
子线程: 9060
主线程: 9060

因为在多线程中,每个线程都是平级的,没有子线程的概念,为了方便理解,所以叫为‘子线程‘

Thread对象的属性和方法

name和getName()方法,其中name设置线程的名字,getName获取线程的名字

from threading import Thread,current_thread
import time,os

def task():
    print(‘子线程名为:‘,current_thread().getName())  # 获取当前线程的名字
    time.sleep(2)

if __name__ == ‘__main__‘:
    t = Thread(target=task,name=‘Thread子线程‘)  # 设置子线程的名字
    t.start()
    print(‘主线程‘)

# 运行结果
主线程
子线程名为: Thread子线程

join()方法和is_alive()方法

join()方法让主线程等待子线程完成之后再进行,is_alive()方法检测当前线程是否存活

from threading import Thread,current_thread
import time,os

def task(name):
    print(‘%s is running‘%name)
    time.sleep(2)
    print(‘%s is done‘%name)

if __name__ == ‘__main__‘:
    t = Thread(target=task,args=(‘子进程1‘,))
    t.start()
    print(t.is_alive())  # 判断是否存活
    t.join()  #让主进程等待
    print(‘主线程‘)
    print(t.is_alive())  # 判断是否存活

# 运行结果为
子进程1 is running
True
子进程1 is done
主线程
False

active_count()检查存活的线程数量

from threading import Thread,current_thread,active_count  # 导入这个模块
import time,os

def task(name):
    print(‘%s is running‘%name)
    time.sleep(2)
    print(‘%s is done‘%name)

if __name__ == ‘__main__‘:
    t1 = Thread(target=task,args=(‘子进程1‘,))
    t2 = Thread(target=task,args=(‘子进程2‘,))
    t1.start()
    t2.start()
    print(active_count())  # 判断当前线程数
    t1.join()
    t2.join()

    print(‘主线程‘)

# 运行结果
子进程1 is running
子进程2 is running
3
子进程2 is done
子进程1 is done
主线程

eumecate()把当前活跃线程对象拿出来

from threading import Thread,current_thread,active_count,enumerate
import time,os

def task(name):
    print(‘%s is running‘%name)
    time.sleep(2)
    print(‘%s is done‘%name)

if __name__ == ‘__main__‘:
    t1 = Thread(target=task,args=(‘子进程1‘,))
    t2 = Thread(target=task,args=(‘子进程2‘,))
    t1.start()
    t2.start()
    print(‘当前线程存活数目:‘,active_count())
    print(‘当前活跃线程对象:‘,enumerate())
    print(‘主线程‘)

# 运行结果:
子进程1 is running
子进程2 is running
当前线程存活数目: 3
当前活跃线程对象: [<_MainThread(MainThread, started 14472)>, <Thread(Thread-1, started 8036)>, <Thread(Thread-2, started 12328)>]
主线程
子进程1 is done
子进程2 is done

守护线程

无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁

需要强调的是,运行完毕并非是终止运行

1.对主进程来说,运行完毕指的是主进程代码运行完毕
2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕后,主线程才算是运行完毕

详细解释:

1.主进程在其代码结束后就已经算是运行完毕了(守护进程此时就会被回收掉),然后主进程会一直等非守护进程的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。

2.主线程在其他非守护线程运行完毕后才算运行完毕(守护进程在此时就会回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都完毕后才能结束。

代码演示

# 只有一个线程

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print(‘%s is say hello‘%name)

if __name__ == ‘__main__‘:
    t = Thread(target=sayhi,args=(‘肖亚飞‘))
    # t.setDaemon(True)  # 设置成为守护线程
    t.daemon = True
    t.start()
    print(‘主线程‘)  # 到此时主线程运行完毕
    print(t.is_alive())  # 判断线程是否存活

# 运行结果为:
主线程
True
# 有多个线程

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(2)
    print(‘end123‘)

def bar():
    print(456)
    time.sleep(2)
    print(‘end456‘)

if __name__ == ‘__main__‘:
    t1 = Thread(target=foo,)
    t2 = Thread(target=bar,)

    t1.daemon = True  # 将t1设置成为守护线程
    t1.start()
    t2.start()
    print(‘主线程‘)  # 主线程代码

# 运行结果为:
123
456
主线程
end123
end456

死锁现象与递归锁

死锁现象

所谓死锁:两个或两个以上的进程/线程在执行过程中,因争夺资源而造成的一种互相等待的现象,苦无外力作用,他们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程成为死锁进程,如下就是死锁:

# 死锁
from threading import Thread,Lock
import time

# 创建A锁和B锁
mutexA = Lock()  # 互斥锁只能acquire一次
mutexB = Lock()

class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘%s 拿到了A锁‘%self.name)

        mutexB.acquire()
        print(‘%s 拿到了B锁‘%self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘%s 拿到了B锁‘%self.name)
        time.sleep(0.05)

        mutexA.acquire()
        print(‘%s 拿到了A锁‘%self.name)
        mutexA.release()

        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()  # 执行run方法

# 运行结果为:
Thread-1 拿到了A锁
Thread-1 拿到了B锁
Thread-1 拿到了B锁
Thread-2 拿到了A锁

说明:在这段代码中,f2 方法为首先拿到mutexB这把锁,在拿到之后会沉睡0.05s,操作系统会觉得这是一个IO堵塞,然后将COU切换给Thread-2使用,然后Thread-2拿到了mutexA锁后,发现:进入了死锁

而解决死锁的方法就是使用递归锁

递归锁

我们刚刚见过,互斥锁只能acquire一次,但是递归锁就可以acquire多次,就相当于每acquire一次,它的计数器就会增加1,每release一次就是减少1,当计数为0时,才可以被其它线程抢到acquire。

# 使用递归锁解决死锁问题
from threading import Thread,RLock
import time

# 创建锁
mutexA = RLock()
mutexB = mutexA  # 相当于一把锁

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘%s 拿到了A锁‘%self.name)

        mutexB.acquire()
        print(‘%s 拿到了B锁‘%self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘%s 拿到了B锁‘%self.name)
        time.sleep(1)

        mutexA.acquire()
        print(‘%s 拿到了A锁‘%self.name)
        mutexA.release()

        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()

信号量

互斥锁就是在同一时间只能有一个任务抢到锁去执行,而信号量也是一把锁,可以指定信号量为5,信号量就是同一时间有5个任务拿到锁去执行,如果说互斥锁是合租房屋里的人去抢一个厕所,那么信号量就相当于一群路人争夺公共厕所,公共厕所的人数有限制,这就意味着同一时间可以有多少人上公共厕所,但公共厕所容纳的人数也是一定的,这就是信号量的大小。

# 信号量
from threading import Thread,Semaphore,current_thread
import time,random

sm = Semaphore(3)  # 设置信号量大小为3

def task():
    sm.acquire()
    print(‘%s in‘%current_thread().getName())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t = Thread(target=task,)
        t.start()

# 运行结果为
Thread-1 in
Thread-2 in
Thread-3 in

Thread-4 in

Thread-5 in

Thread-6 in
Thread-7 in
Thread-8 in

Thread-9 in

Thread-10 in

# 其中,拿到锁和释放锁还可以有这么种写法
    with sm:
        print(‘%s in ‘%current_thread().getName())
        time.sleep(random.randint(1,3))

解析

Semaphore管理一个内置的计数器
每当调用acquire()时内置计数器会-1
调用release()时内置计数器+1
计数器不能小于0;当计数器为0时,acquire()将阻塞线程知道其他线程调用release()

Event定时器

线程的一个关键特性是每个线程都是独立运行且状态不可预测的。如果程序中的其他线程需要通过判断某个线程的状态来确定自己的下一步操作,这时线程同步问题就会变得很棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等到某些事情的发生在初始情况下,Event对象中的信号标志被设置成为False,如果有线程等待Event对象,而这个Event对象的标志为假,那么这个线程就会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置成为真的Event对象,那么它将忽略这个事情,继续执行。

from Threading import Event
event = Event()

event.isSet()  # 返回event的状态值

event.wait()  # 如果event.isSet == False,则阻塞线程

event.set()  # 设置event的状态值为True,所有阻塞线程激活进入就绪状态,等到操作系统调度

event.clear()  # 恢复event的状态值为True

在高中上课时,老师讲课学生听课,学生想要下课必须等待老师发送下课信号

from threading import Thread,Event
import time

event = Event()

def student(name):
    print(‘%s 正在听课‘%name)
    event.wait()
    print(‘%s 课件活动‘%name)

def teacher(name):
    print(‘老师 %s 正在讲课 ‘%name)
    time.sleep(7)
    event.set()

if __name__ == ‘__main__‘:
    stu1 = Thread(target=student,args=(‘李鹏‘,))
    stu2 = Thread(target=student,args=(‘李坤‘,))
    stu3 = Thread(target=student,args=(‘魏文武‘,))
    t1 = Thread(target=teacher,args=(‘肖亚飞‘,))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()

# 运行结果
李鹏 正在听课
李坤 正在听课
魏文武 正在听课
老师 肖亚飞 正在讲课 

李坤 课件活动
魏文武 课件活动
李鹏 课件活动

大学生活,老师讲他自己的,学生想下课就下课

from threading import Thread,Event
import time
event = Event()

def student(name):
    print(‘学生 <%s> 正在听课‘%name)
    event.wait(3)  # 设置超时时间,等待2秒就走
    print(‘学生 <%s> 课件活动‘%name)
def teacher(name):
    print(‘%s 正在讲课‘%name)
    time.sleep(7)
    event.set()

if __name__ == ‘__main__‘:
    stu1 = Thread(target=student,args=(‘李鹏‘,))
    stu2 = Thread(target=student,args=(‘李坤‘,))
    stu3 = Thread(target=student,args=(‘大山‘,))
    t1 = Thread(target=teacher,args=(‘肖亚飞‘,))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()

# 运行结果
学生 <李鹏> 正在听课
学生 <李坤> 正在听课
学生 <大山> 正在听课
肖亚飞 正在讲课

学生 <李坤> 课件活动    # 3s超时时间过去了
学生 <李鹏> 课件活动
学生 <大山> 课件活动

例如,有多个工作线程尝试连接MySQL,我们想要在连接前确保MySQL服务正常才能让那些工作线程去连接MySQL服务器,如果连接不成功,都回去尝试连接。那么我们需要采用Event机制来协调各个线程之间的连接操作

# 检查mysql连接
from threading import Thread,Event,current_thread
import time,random

# 先生成定时器对象
event = Event()

def conn_mysql():
    count = 1
    while not event.is_set():  # 默认为False
        if count > 3:
            raise TimeoutError(‘连接超时‘)
        print(‘<%s> 第%s次尝试连接‘%(current_thread().getName(),count))
        event.wait(0.5)
        count += 1
        print(‘<%s> 连接成功‘%current_thread().getName())

def check_mysql():
    print(‘%s is checking‘%current_thread().getName())
    time.sleep(5)
    event.set()

if __name__ == ‘__main__‘:
    for i in range(3):
        t = Thread(target=conn_mysql,)
        t.start()
    t = Thread(target=check_mysql)
    t.start()

# 运行结果为
<Thread-1> 第1次尝试连接
<Thread-2> 第1次尝试连接
<Thread-3> 第1次尝试连接
Thread-4 is checking
<Thread-1> 连接成功
<Thread-2> 连接成功
<Thread-3> 连接成功
<Thread-3> 第2次尝试连接
<Thread-1> 第2次尝试连接
<Thread-2> 第2次尝试连接
<Thread-2> 连接成功
<Thread-1> 连接成功
<Thread-1> 第3次尝试连接
<Thread-3> 连接成功
<Thread-2> 第3次尝试连接
<Thread-3> 第3次尝试连接
<Thread-2> 连接成功
<Thread-1> 连接成功
<Thread-3> 连接成功

定时器

定时器,指定n秒后执行某操作

# 定时器
from threading import Timer
def hello():
    print(‘hello world‘)

t = Timer(2,hello)
t.start()

基于定时器实现验证码登录

# 基于定时器实现验证码登录
from threading import Thread,Timer
import random

class Code():
    def __init__(self):
        self.make_cache()

    # 在登录的时候就应该有一个验证码
    def make_cache(self,interval=5):
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval,self.make_cache)
        self.t.start()

    # 生成验证码
    def make_code(self,n=4):  # 验证码的个数
         res = ‘‘
         for i in range(n):
             s1 = str(random.randint(0,9))
             s2 = chr(random.randint(65,90))
             res += random.choice([s1,s2])  # 随机字符串
         return res

    # 验证
    def check(self):
        while True:
            code = input(‘请输入验证码>>>‘).strip()
            if code.upper() == self.cache:
                print(‘验证码输入正确‘)
                self.t.cancel()
                break

obj = Code()
obj.check()

# 运行结果为
IFI4
请输入验证码>>>PRC0
PRCO878M

请输入验证码>>>878M
验证码输入正确

## 线程Queue

在线程编程中,当信息必须在多个线程之间安全地交换时,队列特别有用。

线程有三种用法

  • 队列:先进先出
  • 堆栈:先进后出
  • 优先级队列:存储数据时可设置优先级的队列

下面依次来演示一下

队列:先进先出

# 线程queue-->队列
import queue
q = queue.Queue(3)  # 设置队列为3
q.put(‘first‘)
q.put(2)
q.put(‘third‘)

# 打印
print(q.get())
print(q.get())
print(q.get())

# 打印结果为
first
2
third

堆栈:先进后出

# 线程queue-->堆栈
import queue
q = queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

# 打印结果为
3
2
1

优先级队列:存储数据时可设置优先级的队列

# 线程queue-->优先级
import queue
q = queue.PriorityQueue(3)
q.put((10,‘first‘))  # 10 代表优先级
q.put((40,2))
q.put((20,‘third‘))

print(q.get())  # 数字越小优先级越高
print(q.get())
print(q.get())

# 打印结果为
(10, ‘first‘)
(20, ‘third‘)
(40, 2)

进程池和线程池

在刚开始学多线程或多进程时,我们迫不及待的基于多线程、多进程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数会随着并发的客户端的数目增多而增多,这会对服务器主机带来巨大的压力,甚至瘫痪,于是我们必须对服务器开启的进程数或线程数加以控制,让机器在一个自己能够承受的范围内运行,这就是进程池和线程池的用途,例如进程池,就是用来存放进程的池子,本质上还是基于多进程,只不过是对开启进程的数目加上了限制

介绍

官网:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures  模块提供了高度封装的异步调用接口
ThreadPoolExecutor   线程池,提供异步调用
ProcessPoolExecutor  进程池,提供异步调用

Both implement the same interface, which is defined by the abstract Executor class.两者实现相同的接口,抽象Executor类定义该接口。

基本方法

1.submit(fn,*args,**kwargs)
异步提交任务

2.map(func,*iterables,timeout=None,chunksize=1)
取代for循环submit操作

3.shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕后收回资源才继续
wait=False,立即返回,并不会原地等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4.result(timeout=None)
取到结果

5.add_done_callbakc(n)
回调函数

进程池

用法

from concurrent.futures import ProcessPoolExecutor
import os,time, random

def task(name):
    print(‘name:%s  pid:%s‘%(name,os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == ‘__main__‘:
    pool = ProcessPoolExecutor(4)  # 如果不指定的话,则为本机cpu的核数
    for i in range(10):
        pool.submit(task,‘肖亚飞%s‘%i)  # 异步提交:提交完任务后不用在原地等待

    # 维持计数器,一共有10个任务,运行一个就会少1个
    pool.shutdown(wait=True)
    print(‘主进程‘)

那么把这段代码运行了之后,就可以很明显的看见有效的PID就只有4个,从而很好的控制了PID

线程池

用法

from concurrent.futures import ThreadPoolExecutor
import os,time,random

def task(name):
    print(‘name:%s pid:%s‘%(name,os.getpid()))
    time.sleep(random.randint(0.01-0.02))

if __name__ == ‘__main__‘:
    start_time = time.time()
    pool = ThreadPoolExecutor(4)
    for i in range(10000):
        pool.submit(task,‘肖亚飞%s‘%i)

    pool.shutdown(wait=True)
    stop_time = time.time()
    print(‘主线程‘,stop_time-start_time)

查看线程的名字

# 查看线程的名字
from concurrent.futures import  ThreadPoolExecutor
import time,random,os
from threading import current_thread

def task():
    print(‘name:%s pid:%s‘%(current_thread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(max_workers=4)
    for i in range(100):
        pool.submit(task,)
    pool.shutdown(wait=True)
    print(‘主进程‘)

map方法

map(func,*iterables,timeout=None,chunksize=1) 取代for循环submit操作

# map方法
from concurrent.futures import ThreadPoolExecutor
import time,os
from threading import current_thread

def task():
    print(‘name:%s pid:%s‘%(current_thread().getName(),os.getpid()))
    time.sleep(1)

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(max_workers=4)
    pool.map(task,range(1,12))

回调函数

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程任务执行完毕后自动触发,并接受任务的返回值当做参数,该函数成为回调函数

from concurrent.futures import ThreadPoolExecutor
import requests
import os

def get_page(url):
    print(‘<进程%s> get %s‘ %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {‘url‘:url,‘text‘:respone.text}

def parse_page(res):
    res=res.result()
    print(‘<进程%s> parse %s‘ %(os.getpid(),res[‘url‘]))
    parse_res=‘url:<%s> size:[%s]\n‘ %(res[‘url‘],len(res[‘text‘]))
    with open(‘db.txt‘,‘a‘) as f:
        f.write(parse_res)

if __name__ == ‘__main__‘:
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page)  # parse_page拿到的是一个futuer对象,需要obj,result()拿到结果

 # 运行结果为
<进程2392> get https://www.baidu.com
<进程13288> get https://www.python.org
<进程1268> get https://www.openstack.org
<进程2392> get https://help.github.com/
<进程13984> parse https://www.baidu.com
<进程1268> get http://www.sina.com.cn/
<进程13984> parse https://www.openstack.org
<进程13984> parse http://www.sina.com.cn/
<进程13984> parse https://www.python.org
<进程13984> parse https://help.github.com/

提交任务的两种方式

  • 同步提交:也就是提交完任务后,就在原地等待任务执行完毕,拿到结果再去执行下一段代码
  • 异步提交:提交完任务后不在原地等待执行完毕

我们来模拟一下同步提交和异步提交

# 同步提交
from concurrent.futures import ThreadPoolExecutor
import time,random

def la(name):
    print(‘%s is laing‘%name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*‘#‘
    return {‘name‘:name,‘res‘:res}

def weight(shit):
    name = shit[‘name‘]
    size = len(shit[‘res‘])
    print(‘%s 拉了 %s kg‘%(name,size))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(13)
    shit1 = pool.submit(la,‘李鹏‘).result()
    weight(shit1)
    shit2 = pool.submit(la,‘李坤‘).result()
    weight(shit2)
    shit3 = pool.submit(la,‘魏文武‘).result()
    weight(shit3)

# 运行结果
李鹏 is laing
李鹏 拉了 9 kg
李坤 is laing
李坤 拉了 7 kg
魏文武 is laing
魏文武 拉了 9 kg
# 异步调用
from concurrent.futures import ThreadPoolExecutor
import time,random

def la(name):
    print(‘%s is laing‘%name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*‘#‘
    return {‘name‘:name,‘res‘:res}

def weight(shit):
    shit = shit.result()  # 拿到这个结果
    name = shit[‘name‘]
    res = len(shit[‘res‘])
    print(‘%s 拉了 %s kg‘%(name,res))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(13)
    pool.submit(la,‘李鹏‘).add_done_callback(weight)  # 绑定回调函数,前面一个任务执行完成后,return返回的值(futuer对象)就会当做参数传递个weight函数
    pool.submit(la,‘李坤‘).add_done_callback(weight)
    pool.submit(la,‘魏文武‘).add_done_callback(weight)

# 运行结果
李鹏 is laing
李坤 is laing
魏文武 is laing
魏文武 拉了 12 kg
李坤 拉了 12 kg
李鹏 拉了 11 kg

原文地址:https://www.cnblogs.com/xiaoyafei/p/9277914.html

时间: 2024-11-09 06:10:48

python中多线程的相关文章

python中多线程调度机制以及GIL

总结下python中线程调度机制. 对于线程调度机制而言,同操作系统的进程调度一样,最关键是要解决两个问题: 1.在何时选择挂起当前线程,并选择处于等待的先一个线程呢? 2.在众多等待的线程中,选择哪一个作为激活线程呢? 在python多线程机制中,这个两个问题是有两个层次解决的. 如,进程间的切换,当发生了时钟中断,操作系统响应时钟中断,并在这个时候开始进程的调度. 同样的,python也是通过软件模拟了这样的时钟中断,来激激活线程的调度. 我们知道,python字节码解释器是按照指令,顺序一

python中多线程,多进程,队列笔记(一)

threading简介:If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks si

python中多线程(1)

一多线程的概念介绍 threading模块介绍 threading模块和multiprocessing模块在使用层面,有很大的相似性. 二.开启多线程的两种方式 1.创建线程的开销比创建进程的开销小,因而创建线程的速度快 2 from multiprocessing import Process 3 from threading import Thread 4 import os 5 import time 6 def work(): 7 print('<%s> is running'%os.

自动化测试框架解析2-----关于python的多线程问题

在阅读代码的时候,碰到了python中多线程的问题.在这里做一个记录 mport time  import threading def movie(func): for i in range(2): print "I am watching movie <%s>, time:%s"%(func,time.ctime()) time.sleep(5) def music(func): for i in range(2): print "I am listennig 

Python中的线程与进程

进程与线程 在多任务处理中,每一个任务都有自己的进程,一个任务会有很多子任务,这些在进程中开启线程来执行这些子任务.一般来说,可以将独立调度.分配的基本单元作为线程运行,而进程是资源拥有的基本单位. python支持多进程multiprocessing,以及多线程threading. 多进程 os.fork()函数可以开启一个进程.该函数会返回两次值,分别在父进程中返回子进程的ID,而在子进程中永远返回0. os.getpid()函数可以返回进程的ID.os.getppid()则可以返回父进程的

Python有了asyncio和aiohttp在爬虫这类型IO任务中多线程/多进程还有存在的必要吗?

最近正在学习Python中的异步编程,看了一些博客后做了一些小测验:对比asyncio+aiohttp的爬虫和asyncio+aiohttp+concurrent.futures(线程池/进程池)在效率中的差异,注释:在爬虫中我几乎没有使用任何计算性任务,为了探测异步的性能,全部都只是做了网络IO请求,就是说aiohttp把网页get完就程序就done了. 结果发现前者的效率比后者还要高.我询问了另外一位博主,(提供代码的博主没回我信息),他说使用concurrent.futures的话因为我全

Python中的多进程与多线程/分布式该如何使用

在批评Python的讨论中,常常说起Python多线程是多么的难用.还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行.因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行.必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情.如果你还没看过的话,我建议你看看Eqbal Quran的文章<Ruby

python中的多线程

一个程序可以理解为一个进程,这个进程有其代号,可以依据这个代号将其杀死. 一个进程肯定有且只有一个主线程,他可以有很多子线程. 运行一个任务如果可以有许多子线程同时去做,当然会提高效率. 但是,在python中,多线程其实不是严格意义上的多线程. 因为,python有一个全局锁的概念,它保证在某一个时间节点上,只能存在一个线程在运行,只是这个时间节点非常短,人类意识不到. 所以说,Python的多线程其实就是不断地切换进程,而没有把所有的进程在同一时间同时运行. 注意,切换进程并不是依据什么顺序

python中的多线程【转】

转载自: http://c4fun.cn/blog/2014/05/06/python-threading/ python中关于多线程的操作可以使用thread和threading模块来实现,其中thread模块在Py3中已经改名为_thread,不再推荐使用.而threading模块是在thread之上进行了封装,也是推荐使用的多线程模块,本文主要基于threading模块进行介绍.在某些版本中thread模块可能不存在,要使用dump_threading来代替threading模块. 线程创