一、线程介绍 处理线程的模块是threading,multiprocessing模块处理方式跟threading相似 开启线程的两种方式: 例子: from threading import Thread from multiprocessing import Process def work(name): print(‘%s say hello‘ %name) if __name__ == ‘__main__‘: t = Thread(target=work, args=(‘hyh‘,)) t.start() print(‘主线程‘) class Work(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(‘%s say hello‘ %self.name) if __name__ == ‘__main__‘: t = Work(‘hyh‘) t.start() print(‘主线程‘) 二、线程方法 queue方法 例子: import queue q = queue.Queue(3) #先进先出 q.put(1) q.put(‘hyh‘) q.put([1,2,3,4]) print(q.get()) print(q.get()) print(q.get()) q = queue.LifoQueue() #后进先出 q.put(1) q.put(‘hyh‘) q.put([1,2,3,4]) print(q.get()) print(q.get()) print(q.get()) q = queue.PriorityQueue() #优先级,数字越小优先级越高 q.put((10, ‘a‘)) q.put((9, ‘b‘)) q.put((11, ‘c‘)) print(q.get()) print(q.get()) print(q.get()) 线程其他方法 例子: import time from threading import Thread import threading def work(): time.sleep(2) print(‘%s say hello‘ %(threading.current_thread().getName())) if __name__ == ‘__main__‘: t = Thread(target=work) t.setDaemon(True) #设置成守护线程 t.start() t.join() print(threading.enumerate()) #当前活跃的线程对象,是一个列表形式 print(threading.active_count()) #当前活跃的线程数目 print(‘主线程‘, threading.current_thread().getName()) #线程名字 三、python全局解释器锁GIL python同一进程的线程利用不了多核优势,因为一个线程运行时获取GIL锁,等到运行结束释放GIL, 其它线程才能申请GIL 现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上 的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的 例子: 计算密集型 from threading import Thread from multiprocessing import Process import os import time def work(): res = 0 for i in range(1000000): res += i if __name__ == ‘__main__‘: t_l = [] start_time = time.time() for i in range(300): t = Thread(target=work) t_l.append(t) t.start() for i in t_l: i.join() stop_time = time.time() print(‘run time is %s‘ %(stop_time - start_time)) print(‘主线程‘) IO密集型 from threading import Thread from multiprocessing import Process import time import os def work(): time.sleep(2) print(os.getpid()) if __name__ == ‘__main__‘: t_l = [] start_time = time.time() for i in range(1000): t = Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() stop_time = time.time() print(‘run time is %s‘ %(stop_time - start_time)) 线程锁Lock import threading R=threading.Lock() R.acquire() ‘‘‘ 对公共数据的操作 ‘‘‘ R.release() 死锁 例子: from threading import Thread,Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(‘\033[41m%s 拿到A锁\033[0m‘ %self.name) mutexB.acquire() print(‘\033[42m%s 拿到B锁\033[0m‘ %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print(‘\033[43m%s 拿到B锁\033[0m‘ %self.name) time.sleep(2) mutexA.acquire() print(‘\033[44m%s拿到A锁\033[0m‘ %self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t = MyThread() t.start() 输出结果: Thread-1 拿到A锁 Thread-1 拿到B锁 Thread-1 拿到B锁 Thread-2 拿到A锁 卡住。。。 递归锁RLock 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可 以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子 如果使用RLock代替Lock,则不会发生死锁 from threading import Thread,RLock import time mutex = RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutex.acquire() print(‘\033[41m%s 拿到A锁\033[0m‘ %self.name) mutex.acquire() print(‘\033[42m%s 拿到B锁\033[0m‘ %self.name) mutex.release() mutex.release() def func2(self): mutex.acquire() print(‘\033[43m%s 拿到B锁\033[0m‘ %self.name) time.sleep(2) mutex.acquire() print(‘\033[44m%s拿到A锁\033[0m‘ %self.name) mutex.release() mutex.release() if __name__ == ‘__main__‘: for i in range(10): t = MyThread() t.start() 信号量Semahpore Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release() 例子: import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print(threading.current_thread().getName() + ‘ get spmaphore‘) time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start() event对象 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断 某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题, 我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某 些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经 被设置为真的Event对象,那么它将忽略这个事件, 继续执行 event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False 例子: from threading import Thread,Event import threading import time,random def conn_mysql(): print(‘\033[42m%s 等待链接Mysql...\033[0m‘ %threading.current_thread().getName()) event.wait() print(‘\033[42mMysql初始化成功,%s开始连接...\033[0m‘ %threading.current_thread().getName()) def check_mysql(): print(‘\033[41m正在检查mysql...\033[0m‘) time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: event = Event() t1 = Thread(target=conn_mysql) t2 = Thread(target=conn_mysql) t3 = Thread(target=check_mysql) t1.start() t2.start() t3.start() wait(time)设置超时时间 from threading import Thread,Event import threading import time,random def conn_mysql(): while not event.is_set(): print(‘\033[42m%s 等待连接mysql...\033[0m‘ %threading.current_thread().getName()) event.wait(0.1) print(‘\033[42mMysql初始化成功,%s开始连接...\033[0m‘ %threading.current_thread().getName()) def check_mysql(): print(‘\033[41m正在检查mysql...\033[0m‘) time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: event=Event() t1 = Thread(target=conn_mysql) t2 = Thread(target=conn_mysql) t3 = Thread(target=check_mysql) t1.start() t2.start() t3.start() Timer定时器,指定n秒后执行操作 例子: from threading import Timer def hello(): print("hello, world") t = Timer(3, hello) t.start() 四、协程 协程: 单线程下的并发,又称微线程,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制 调度的 要实现协程,关键在于用户程序自己控制程序切换,切换之前必须由用户程序自己保存协程上一次调用 时的状态,如此,每次重新调用时,能够从上次的位置继续执行 我们之前已经学习过一种在单线程下可以保存程序运行状态的方法,即yield 不使用yield import time def consumer(item): x = 1111111111111 y = 222222222222222 z = 3333333333333333 x1 = 122324234534534 x2 = 21324354654654 x3 = 3243565432435 def producer(target,seq): for item in seq: target(item)每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大 start_time = time.time() producer(consumer,range(100000000)) stop_time = time.time() print(‘run time is:%s‘ %(stop_time - start_time)) 打印结果:run time is:14.8908851146698 使用yield import time def init(func): def wrapper(*args, **kwargs): g = func(*args, **kwargs) next(g) return g return wrapper @init def consumer(): x = 1111111111111 y = 222222222222222 z = 3333333333333333 x1 = 122324234534534 x2 = 21324354654654 x3 = 3243565432435 while True: item = yield def producer(target, seq): for item in seq: target.send(item) start_time = time.time() producer(consumer(), range(100000000)) stop_time=time.time() print(‘run time is:%s‘ %(stop_time-start_time)) greenlet实现线程的切换 例子: from greenlet import greenlet def test1(): print(‘test1,first‘) gr2.switch() print(‘test1,second‘) gr2.switch() def test2(): print(‘test2,first‘) gr1.switch() print(‘test2,second‘) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() switch传参数 import time from greenlet import greenlet def eat(name): print(‘%s eat food 1‘ %name) gr2.switch(‘alex fly fly fly‘) print(‘%s eat food 2‘ %name) gr2.switch() def play_phone(name): print(‘%s play 1‘ %name) gr1.switch() print(‘%s play 2‘ %name) gr1 = greenlet(eat) gr2=greenlet(play_phone) gr1.switch(name=‘egon啦啦啦‘) gevent第三方库 Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。 g1=gevent.spawn()创建一个协程对象g1 io阻塞切换 例子: import gevent import time def eat(): print(‘eat food 1‘) gevent.sleep(2) print(‘eat food 2‘) def play_phone(): print(‘play phone 1‘) gevent.sleep(1) print(‘play phone 2‘) g1 = gevent.spawn(eat) g2 = gevent.spawn(play_phone) gevent.joinall([g1, g2]) print(‘主‘) gevent.sleep(2)模拟的是gevent可以识别的io阻塞 time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码 例子: from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print(‘eat food 1‘) time.sleep(2) print(‘eat food 2‘) def play_phone(): print(‘play phone 1‘) time.sleep(1) print(‘play phone 2‘) g1 = gevent.spawn(eat) g2 = gevent.spawn(play_phone) gevent.joinall([g1, g2]) print(‘主‘) gevent实现单线程下的socket并发 例子: 服务端 from gevent import monkey;monkey.patch_all() from socket import * import gevent def server(server_ip, port): s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR, 1) s.bind((server_ip,port)) s.listen(5) while True: conn, addr = s.accept() gevent.spawn(talk, conn, addr) def talk(conn,addr): try: while True: res = conn.recv(1024) print(‘client %s:%s msg: %s‘ %(addr[0], addr[1], res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘, 8080) 客户端 #!/usr/bin/python # --*-- coding: utf-8 --*-- from socket import * client=socket(AF_INET, SOCK_STREAM) client.connect((‘127.0.0.1‘, 8080)) while True: msg = input(‘>>: ‘).strip() if not msg:continue client.send(msg.encode(‘utf-8‘)) msg = client.recv(1024) print(msg.decode(‘utf-8‘))
时间: 2024-10-20 03:34:31