线程理论
多线程也是用于提高程序的效率
1,多进程
核心是多道技术
本质上就是切换加保存状态
当程序IO操作较多 可以提高程序效率
2,多线程
什么是线程
程序的执行线路
相当于与一条流水线,其包含了程序的具体执行步骤
如果我们把操作系统比喻为一个工厂,进程就是车间,先出就是流水线
3,线程和进程的关系
进程中包含了运行改程序需要所有资源,
*进程是一个资源单位,线程是CPU的最小执行单位*
每一个进程一旦被创建.就默认开启了一条线程,称之为主线程
一个进程可以包含多个线程
进程包含线程 而 线程依赖进程
4,为什么使用线程
是为了提高程序效率,
为何不用多进程提高效率? 是因为进程对操作系统的资源耗费非常高
5,线程是如何提高效率的?
多线程可以使CPU在一个进程内进行切换,从而提高CPU占用率
6,如何使用
两种开启线程的方式
1.实例化Thread类
2.继承Thread类 覆盖run方法
什么情况下应该开启多线程
当程序中遇到IO的时候
当程序中时纯计算任务时 也无法提高效率
如何使用多线程: 开启线程的两种方式: 1、实例化Thread类 2、继承Thread类,覆盖run方法
# 实例化Thread
from threading import Thread
?
def task():
print("threading run")
t1 = Thread(target=task) # 形式与进程类似
t1.start() # 手动启动线程,不用像进程那样放到main下
print("over") # 子线程可能更快,也可能慢
?
# 继承Thread类,覆盖run方法
class MyThread(Thread):
def run(self):
print("子线程 run")
?
MyThread().start()
print("over")
守护线程
守护线程:在所有非守护线程结束后结束的线程。 一个程序中可以有多个守护线程,非守护线程都结束后,多个守护进程一起死。
# 调节下面的两处时间,可以得到不同的结果
# 两处时间都取消时,守护进程可能也会完全打印,主要是因为主线程在打印结束后需要时间去结束(并不是说只要主线程打印结束立马主线程就完全结束,结束也是需要时间的),而线程速度极快,所以可能完全执行。当然,也可能部分执行,这取决于CPU的运行情况。
from threading import Thread
import time
?
def task():
print("子线程运行。。。")
# time.sleep(1)
print("子线程运行。。。")
t = Thread(target=task)
t.setDaemon(True) # 要放到子线程开始前,注意进程的守护进程是deamon,线程的守护线程是setDeamon
t.start()
# time.sleep(0.1)
print("over")
守护线程 守护线程:在所有非守护线程结束后结束的线程。 一个程序中可以有多个守护线程,非守护线程都结束后,多个守护进程一起死。
调节下面的两处时间,可以得到不同的结果
两处时间都取消时,守护进程可能也会完全打印,主要是因为主线程在打印结束后需要时间去结束(并不是说只要主线程打印结束立马主线程就完全结束,结束也是需要时间的),而线程速度极快,所以可能完全执行。当然,也可能部分执行,这取决于CPU的运行情况。
from threading import Thread import time
def task(): print("子线程运行。。。") # time.sleep(1) print("子线程运行。。。") t = Thread(target=task) t.setDaemon(True) # 要放到子线程开始前,注意进程的守护进程是deamon,线程的守护线程是setDeamon t.start()
time.sleep(0.1)
print("over")
线程中常用属性
from threading import Thread,current_thread,enumerate,active_count?import os?def task(): print("running...") print("子线程",os.getpid()) # 主线程和子线程PID相同 print(current_thread()) # 获取当前线程对象 print(active_count()) # 获取正在运行的线程个数?t1 = Thread(target=task)t1.start()print("主线程",os.getpid())?print(t1.is_alive()) # t1线程是否还在运行print(t1.isAlive()) # 同上,只是两种不同写法,官方文档中显示一模一样:isAlive = is_aliveprint(t1.getName()) # 获取子线程t1的线程名称(Thread-1)(即第一个子线程)?print(enumerate()) # 获取所有线程对象列表输出结果:[<_MainThread(MainThread, started 19516)>, <Thread(Thread-1, started 9072)>]?
线程互斥锁
当多个线程需要同时修改同一份数据时,可能会造成数据错乱,此时需要互斥锁。(类似于进程互斥锁)
不用线程互斥锁的情况:
a = 100from threading import Threadimport timedef task(): global a temp = a - 1 time.sleep(0.01) # 强行增加子线程运行时间,由于没有互斥锁,主线程的a会错乱 a = temp # 故意增加步骤,延长线程运行时间for i in range(100): t = Thread(target=task) t.start()?print(a) # 输出结果可能为98或99等等(都有可能)
解决办法,加入线程互斥锁,并且添加join确保子线程全部运行结束:
a = 100from threading import Thread,Lockimport timelock = Lock()def task(): lock.acquire() # 与进程互斥锁一致 global a temp = a - 1 time.sleep(0.01) a = temp lock.release() # 拿到一次锁就必须释放一次ts = []for i in range(100): t = Thread(target=task) t.start() ts.append(t)for t in ts: t.join() # 确保所有子进程都结束?# t.join() # 使用此方法不行,因为这样只能等待最后一个子线程,但是无法保证最后一个子线程是最后执行的,如果最后一个子线程却先于一些线程执行完毕,那么还是会出问题。print(a) # 结果为0
??信号量 其实也是一种锁,特点是可以设置一个数据可以被几个线程(进程)共享? 与普通锁的区别 普通锁一旦加锁 则意味着这个数据在同一时间只能被一个线程使用 信号量 可以让这个数据在同一时间只能被多个线程使用? 使用场景,可以限制一个数据被同时访问的次数,保证程序正常运行? """?from threading import Semaphore,Thread,current_threadimport time,random?sem = Semaphore(3)?def task(): sem.acquire() print("%s run..." % current_thread()) time.sleep(3) sem.release()??for i in range(10): t = Thread(target=task) t.start()
生产者消费者+守护进程
生产者消费者+守护进程
应用:生产者和消费者通过队列交换数据,都结束后程序再结束。
这里先补充一个其他知识:
进程队列的另一个类JoinableQueue
JoinableQueue同样通过multiprocessing使用。
创建队列的另外一个类:
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
简言之:(如果还不理解看最下面的官方文档)
task_done()表示队列中的某个任务已经完成了,会返回给join一个信号。 join()会一直阻塞直到队列中的所有任务已经被获取并处理。一旦任务被加入队列,任务计数会加1,一旦收到task_done()的返回信号,任务计数会减1,如果任务计数减为0,则join()解除阻塞,程序也就能继续运行了。
import time,randomfrom multiprocessing import JoinableQueue,Process??def eat_hotdog(name,q): while True: # 因为是守护进程,所以只要主进程结束,该循环也就结束了。 res = q.get() print("%s 在吃 %s" %(name,res)) time.sleep(random.randint(1,2)) q.task_done() # 记录当前已经被处理的数据的数量,该方法是JoinableQueue下的方法。??def make_hotdog(name,q): for i in range(1,5): res = "第%s热狗" %i print("%s 生产了 %s" % (name, res)) q.put(res)??if __name__ == ‘__main__‘: q = JoinableQueue() p1 = Process(target=make_hotdog,args=("徐福记热狗店",q)) p1.start()? p3 = Process(target=make_hotdog,args=("万达热狗店",q)) p3.start()? p2 = Process(target=eat_hotdog,args=("思聪",q)) p2.daemon = True # 此处加入守护进程,确保主程序结束后,该进程的while循环也会结束 p2.start()?# 生产者全部生产完成 p1.join() p3.join()?# 保证队列为空,全部被处理 q.join() # 该join方法是JoinableQueue下的方法,并不是上面的那种join方法 输出结果:徐福记热狗店 生产了 第1热狗徐福记热狗店 生产了 第2热狗徐福记热狗店 生产了 第3热狗徐福记热狗店 生产了 第4热狗万达热狗店 生产了 第1热狗万达热狗店 生产了 第2热狗万达热狗店 生产了 第3热狗万达热狗店 生产了 第4热狗思聪 在吃 第1热狗思聪 在吃 第2热狗思聪 在吃 第3热狗思聪 在吃 第4热狗思聪 在吃 第1热狗思聪 在吃 第2热狗思聪 在吃 第3热狗思聪 在吃 第4热狗
官方文档:
JoinableQueue是由Queue派生出来的一个类,与Queue不同,JoinableQueue有task_done()和Join()方法。
task_done()Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done()tells the queue that the processing on the task is complete.If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).Raises a ValueError if called more times than there were items placed in the queue.?join()Block until all items in the queue have been gotten and processed.The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks
原文地址:https://www.cnblogs.com/cs2612109/p/10209786.html