一、线程 1、基本使用 创建线程的两种方式:
import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() #以下为执行结果 123
import threading class MyThread(threading.Thread): def __init__(self, func,args): self.func = func self.args = args super(MyThread, self).__init__() def run(self): self.func(self.args) def f2(arg): print(arg) obj = MyThread(f2,123) obj.start() #以下为执行结果 123
2、队列 #put get 存取数据
import queue q = queue.Queue() #创建队列对象,该对象为先进先出对象 q.put(123) q.put(234) q.put(345) print(q.get()) print(q.get()) print(q.get()) #输出: 123 234 345
#判断 阻塞
import queue q=queue.Queue() print(q.empty()) #队列是否为空 print(q.full()) #队列是否已满 #q=queue.Queue(10) #队列最大长度10 q.put(11) q.put(22) print(q.qsize()) #队列现在有几个元素 # q.put(33,block=False) #不阻塞,直接抛出异常 # q.put(33,timeout=2) #存数据阻塞 ,超时时间 2秒后抛出异常 print(q.get()) print(q.get()) print(q.get(timeout=2)) #取数据 阻塞,两秒后抛出异常 #以下为执行结果 True False 2 11 22
# task_done() join()
import queue q = queue.Queue(5) q.put(123) q.put(123) q.get() q.task_done() #表示某个任务完成. q.get() q.task_done() q.join() #如果队列中有没有处理的元素,等待 阻塞,任务执行完成,取消阻塞。
3、消费者模型
import queue import threading import time q = queue.Queue() def productor(arg): #put 像队列添加购买请求 """ 买票 :param arg: :return: """ q.put(str(arg) + ‘- 包子‘) for i in range(300): t = threading.Thread(target=productor,args=(i,)) t.start() def consumer(arg): """ 服务器后台 :param arg: :return: """ while True: print(arg, q.get()) #get 处理请求 time.sleep(2) for j in range(3): t = threading.Thread(target=consumer,args=(j,)) t.start() #以下为执行结果: 0 0- 包子 1 1- 包子 2 2- 包子 1 3- 包子 2 4- 包子 0 5- 包子 2 6- 包子 ...
4、线程锁 1)线程锁 ,每次放行一个
import threading import time NUM=10 def func(l): global NUM #上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM) #开锁 l.release() lock = threading.Lock() #锁每次放一个 #lock= threading.RLock() #多层锁 for i in range(10): t=threading.Thread(target=func,args=(lock,)) t.start() #以下为执行结果: 9 8 7 6 5 4...
2)信号量锁,可以设置每次放行的个数(5个)
import threading import time NUM=10 def func(i,l): global NUM #上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM,i) #开锁 l.release() lock=threading.BoundedSemaphore(5) #每次放多个 for i in range(30): t=threading.Thread(target=func,args=(i,lock,)) t.start() #以下为执行结果: 5 1 5 0 3 2 2 4 1 3 0 6 ...
3)事件event锁, 放行所有
import threading def func(i,e): print(i) e.wait() #检查是什么灯,如果红灯,停,绿灯,行 print(i+100) event= threading.Event() #锁住,要放一起放 for i in range(10): t=threading.Thread(target=func,args=(i,event,)) t.start() event.clear() #设置成红灯 inp=input(‘>>>‘) if inp==‘1‘: event.set() #设置成绿灯 #以下为运行结果: 0 1 2 3 4 5 6 7 8 9 >>>1 100 101 102 104 105 107 108 109 103 106
4)条件Condition锁 (1).wait() 没一次放行,可以自定义个数
import threading def func(i,con): print (i) con.acquire() con.wait() print (i+100) con.release() c=threading.Condition() for i in range(10): t= threading.Thread(target=func,args=(i,c)) t.start() while True: inp=input(‘>>>‘) if inp==‘q‘: break c.acquire() c.notify(int(inp)) #锁传几个处理几个 c.release() #以下为执行结果 0 1 2 3 4 5 6 7 8 9 >>>1 >>>100 2 >>>101 102
(2) con.wait_for(condition) 传递参数,条件成立放行
import threading def condition(): ret=False r=input(‘>>>‘) if r==‘true‘: ret=True return ret def func(i,con): print (i) con.acquire() con.wait_for(condition) print (i+100) con.release() c=threading.Condition() for i in range(10): t= threading.Thread(target=func,args=(i,c,)) t.start() #以下为执行结果 >>>1 2 3 4 5 6 7 8 9 >>>true 101
5、定时器 timer(监控、客户端的时候可能用到) 一秒后执行代码
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
6、自定义线程池 简易的线程池
import queue import threading import time class ThreadPool: def __init__(self,maxsize=5): self.maxsize= maxsize self._q=queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) pool =ThreadPool(5) def task(arg,p): print(arg) time.sleep(1) p.add_thread() for i in range(100): t=pool.get_thread() obj= t(target=task,args=(i,pool,)) obj.start()
完美的线程池
二、进程 1.基本用法
from multiprocessing import Process def foo(i): print(‘say hi‘,i) if __name__ == "__main__": #windows下才需要,linux不需要main for i in range(10): p = Process(target=foo,args=(i,)) p.start() #以下为执行结果: say hi 0 say hi 3 say hi 1 say hi 2 say hi 5 say hi 4 say hi 6 say hi 7 say hi 8 say hi 9
2.进程共享 (1)默认无法数据共享
from multiprocessing import Process from multiprocessing import queues #import multiprocessing def foo(i,arg): arg.append(i) print(‘say hi‘,i,arg) if __name__ == ‘__main__‘: li=[] for i in range(10): p=Process(target=foo,args=(i,li,)) # p.daemon= True p.start() #以下为执行结果: say hi 0 [0] say hi 1 [1] say hi 2 [2] say hi 3 [3] say hi 6 [6] say hi 4 [4] say hi 5 [5] say hi 9 [9] say hi 8 [8] say hi 7 [7]
(2)queues实现共享 数字递增
from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print(‘say hi‘,i,arg.qsize()) if __name__ == ‘__main__‘: # li=[] li= queues.Queue(20,ctx=multiprocessing) for i in range(10): p=Process(target=foo,args=(i,li,)) # p.daemon= True p.start() #以下为执行结果 say hi 0 say hi 2 say hi 3 say hi 4 say hi 1 say hi 5 say hi 6 say hi 7 say hi 8 say hi 9
(3)Array 共享空间设置限制,超过限制会报错
from multiprocessing import Process from multiprocessing import queues import multiprocessing from multiprocessing import Array def foo(i, arg): arg[i]=i+100 for item in arg: print(item) print("========") if __name__ == ‘__main__‘: li=Array(‘i‘,10) for i in range(10): p = Process(target=foo, args=(i, li,)) # p.daemon= True p.start() #以下为执行结果: 0 0 102 0 0 0 0 0 0 0 ======== 0 0 102 103 0 0 0 0 0 0 ======== 100 0 102 103 0 0 0 0 0 0 ======== 100 0 102 103 104 0 0 0 0 0 ======== 100 0 102 103 104 0 0 0 108 0 ======== 100 0 102 103 104 105 0 0 108 0 ======== 100 0 102 103 104 105 106 0 108 0 ======== 100 101 102 103 104 105 106 0 108 0 ======== 100 101 102 103 104 105 106 107 108 0 ======== 100 101 102 103 104 105 106 107 108 109 ========
(3) Manger 实现数据共享
from multiprocessing import Process from multiprocessing import queues import multiprocessing from multiprocessing import Manager def foo(i, arg): arg[i]=i+100 print(arg.values()) if __name__ == ‘__main__‘: obj=Manager() li=obj.dict() for i in range(10): p = Process(target=foo, args=(i, li,)) # p.daemon= True p.start() # p.join() import time time.sleep(2) # p.join() #以下为执行结果 [100] [100, 101] [100, 101, 104] [100, 101, 102, 104] [100, 101, 102, 104, 105] [100, 101, 102, 103, 104, 105] [100, 101, 102, 103, 104, 105, 107] [100, 101, 102, 103, 104, 105, 106, 107] [100, 101, 102, 103, 104, 105, 106, 107, 108] [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
3、进程池
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == ‘__main__‘: pool= Pool(5) for i in range(30): # pool.apply(func=f1,args=(i,)) #串行 pool.apply_async(func=f1,args=(i,)) # pool.close() #所有任务执行完毕 time.sleep(2) pool.terminate() #立即终止 pool.join() print(‘end‘)
#以下为执行结果
0
1
2
3
4
end
3、锁,与线程锁使用方法一样
from multiprocessing import Process from multiprocessing import queues from multiprocessing import Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore import multiprocessing import time def foo(i,lis,lc): lc.acquire() lis[0] = lis[0] - 1 time.sleep(1) print(‘say hi‘,lis[0]) lc.release() if __name__ == "__main__": # li = [] li = Array(‘i‘, 1) li[0] = 10 lock = RLock() for i in range(10): p = Process(target=foo,args=(i,li,lock)) p.start() #以下为执行结果 say hi 9 say hi 8 say hi 7 say hi 6 say hi 5 say hi 4 say hi 3 say hi 2 say hi 1 say hi 0
三、携程
from greenlet import greenlet import time def test1(): print(12) gr2.switch() time.sleep(1) print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #以下为执行结果: 12 56 34 78
from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print(‘GET: %s‘ % url) resp = requests.get(url) data = resp.text print(‘%d bytes received from %s.‘ % (len(data), url)) gevent.joinall([ gevent.spawn(f, ‘https://www.python.org/‘), gevent.spawn(f, ‘https://www.yahoo.com/‘), gevent.spawn(f, ‘https://github.com/‘), ]) #以下为执行结果: GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 47394 bytes received from https://www.python.org/. 25533 bytes received from https://github.com/. 425622 bytes received from https://www.yahoo.com/.
时间: 2024-10-12 23:27:45