线程:
一套流水线的运行过程,操作系统是个工厂,里面有有多个车间(进程),线程就是每个车间生产的流水线,每个进程,默认都会有个线程
进程是资源单元,线程是cpu上的调度单位
线程的创建开销小,共享空间
进程直接是竞争关系,线程之间是协作关系
一、线程的模块threading模块
方式一
from threading import Thread
from multiprocessing import Process
def task():
print(‘is running‘)
if __name__ == ‘__main__‘:
t=Thread(target=task,)#线程
# t=Process(target=task,)#进程
t.start()
print(‘主’)
线程的结果:
is running
主
进程:
主
is running
进程开销大
线程开销小
方式二
from threading import Thread
from multiprocessing import Process
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print(‘%s is running‘ %self.name)
if __name__ == ‘__main__‘:
t=MyThread(‘egon‘)
# t=Process(target=task,)
t.start()
print(‘主’)
二、pid:
多线程的pid是一样的
多进程 pid不一样
三、多线程共享同一个进程内的资源:
from threading import Thread
from multiprocessing import Process
n=100
def work():
global n
n=0
if __name__ == ‘__main__‘:
# p=Process(target=work,)
# p.start()
# p.join()
# print(‘主‘,n)
t=Thread(target=work,)
t.start()
t.join()
print(‘主‘,n)
线程的n会变,进程不会。
因为线程共享资源,进程不是,而且进程pid不一样。
四、多线程共享同一进程内存地址空间:
from threading import Thread
msg_l=[]
format_l=[]
def talk():
while True:
msg=input(‘>>: ‘).strip()
msg_l.append(msg)
def format():
while True:
if msg_l:
data=msg_l.pop()
format_l.append(data.upper())
def save():
while True:
if format_l:
data=format_l.pop()
with open(‘db.txt‘,‘a‘) as f:
f.write(‘%s\n‘ %data)
if __name__ == ‘__main__‘:
t1=Thread(target=talk)
t2=Thread(target=format)
t3=Thread(target=save)
t1.start()
t2.start()
t3.start()
Threading其他方法:
#current_thread:当前的线程对象
from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
print(‘%s is running‘ %current_thread().getName())
time.sleep(2)
if __name__ == ‘__main__‘:
t=Thread(target=task,)
t.start()
t.join()#等待task运行结束(系统回收完)
print(t.is_alive())#返回线程是否活动
print(t.getName())#返回线程名
print(enumerate())#当前活跃的进程对象
print(‘主‘)
print(activeCount())#活着的线程数
主线程从执行层面上代表了,其所在进程的执行过程。
五、守护线程
无论是进程还是线程,都遵循:守护会等待主运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
from threading import Thread
import time
def task1():
print(‘123‘)
time.sleep(10)
print(‘123done‘)
def task2():
print(‘456‘)
time.sleep(1)
print(‘456done‘)
if __name__ == ‘__main__‘:
t1=Thread(target=task1)
t2=Thread(target=task2)
t1.daemon=True
t1.start()
t2.start()
print(‘主’)
六、GIL
三个需要注意的点:
1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果
七、互斥锁
from threading import Thread,Lockimport timen=100def work(): time.sleep(0.05) global n mutex.acquire() temp=n time.sleep(0.1) n=temp-1 mutex.release() if __name__ == ‘__main__‘: mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
#多进程:#优点:可以利用多核优势#缺点:开销大 #多线程:#优点:开销小#缺点:不能利用多核优势 # from threading import Thread# from multiprocessing import Process# import time# #计算密集型# def work():# res=1# for i in range(100000000):# res+=i## if __name__ == ‘__main__‘:# p_l=[]# start=time.time()# for i in range(4):# # p=Process(target=work) #6.7473859786987305# p=Thread(target=work) #24.466399431228638# p_l.append(p)# p.start()# for p in p_l:# p.join()## print(time.time()-start) from threading import Threadfrom multiprocessing import Processimport time#IO密集型def work(): time.sleep(2) if __name__ == ‘__main__‘: p_l=[] start=time.time() for i in range(400): # p=Process(target=work) #12.104692220687866 p=Thread(target=work) #2.038116455078125 p_l.append(p) p.start() for p in p_l: p.join() print(time.time()-start)
八、死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
死锁现象from threading import Thread,Lock,RLockimport timemutexA=Lock()mutexB=Lock()class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutexB.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) time.sleep(1) mutexA.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(20): t=Mythread() t.start()
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁
#递归锁from threading import Thread,Lock,RLockimport timemutex=RLock()class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutex.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) time.sleep(1) mutex.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutex.release() mutex.release() if __name__ == ‘__main__‘: for i in range(20): t=Mythread() t.start()
九、信号量
from threading import Thread,current_thread,Semaphoreimport time,random sm=Semaphore(5)def work(): sm.acquire() print(‘%s 上厕所‘ %current_thread().getName()) time.sleep(random.randint(1,3)) sm.release() if __name__ == ‘__main__‘: for i in range(20): t=Thread(target=work) t.start()
十、进程池,线程池
进程池import requests #pip3 install requestsimport os,timefrom multiprocessing import Poolfrom concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutordef get_page(url): print(‘<%s> get :%s‘ %(os.getpid(),url)) respone = requests.get(url) if respone.status_code == 200: return {‘url‘:url,‘text‘:respone.text} def parse_page(obj): dic=obj.result() print(‘<%s> parse :%s‘ %(os.getpid(),dic[‘url‘])) time.sleep(0.5) res=‘url:%s size:%s\n‘ %(dic[‘url‘],len(dic[‘text‘])) #模拟解析网页内容 with open(‘db.txt‘,‘a‘) as f: f.write(res) if __name__ == ‘__main__‘: # p=Pool(4) p=ProcessPoolExecutor() urls = [ ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ] for url in urls: # p.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page,url).add_done_callback(parse_page) p.shutdown() print(‘主进程pid:‘,os.getpid()) 线程池import requests #pip3 install requestsimport os,time,threadingfrom multiprocessing import Poolfrom concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutordef get_page(url): print(‘<%s> get :%s‘ %(threading.current_thread().getName(),url)) respone = requests.get(url) if respone.status_code == 200: return {‘url‘:url,‘text‘:respone.text} def parse_page(obj): dic=obj.result() print(‘<%s> parse :%s‘ %(threading.current_thread().getName(),dic[‘url‘])) time.sleep(0.5) res=‘url:%s size:%s\n‘ %(dic[‘url‘],len(dic[‘text‘])) #模拟解析网页内容 with open(‘db.txt‘,‘a‘) as f: f.write(res) if __name__ == ‘__main__‘: # p=Pool(4) p=ThreadPoolExecutor(3) urls = [ ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ] for url in urls: # p.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page,url).add_done_callback(parse_page) p.shutdown() print(‘主进程pid:‘,os.getpid())