一、TCP-socket 服务端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind((‘127.0.0.1‘,8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send(‘你好‘.encode(‘utf-8‘)) print(conn.recv(1024).decode(‘utf-8‘)) conn.close() tcp_sk.close() 客户端: import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8000)) print(sk.recv(1024).decode(‘utf-8‘)) sk.send(‘嘿嘿嘿‘.encode(‘utf-8‘)) sk.close() 二、UDP-socket 服务端: import socket udp_sk = socket.socket(type=socket.SOCK_DGRAM) udp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) udp_sk.bind((‘127.0.0.1‘,8001)) msg,addr = udp_sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) udp_sk.sendto(‘你好‘.encode(‘utf-8‘),addr) udp_sk.close() 客户端: import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto(‘哈哈‘.encode(‘utf-8‘),(‘127.0.0.1‘,8001)) msg,addr = sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) sk.close() 三、socketserver 服务端: import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): conn = self.request while True: conn.send(b‘hello‘) print(conn.recv(1024).decode(‘utf-8‘)) socketserver.TCPServer.allow_reuse_address = True server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8080),Myserver) server.serve_forever() 客户端: import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) while True: ret = sk.recv(1024) print(ret.decode(‘utf-8‘)) sk.send(b‘hiworld‘) sk.close() 四、进程 方式一、 from multiprocessing import Process def func(arg): print(arg) if __name__ == ‘__main__‘: p = Process(target=func,args=(‘子进程‘,)) p.start() p.join() print(‘主进程‘) 方式二、 from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == ‘__main__‘: p = MyProcess(‘小明‘) p.start() 五、线程 方式一、 from threading import Thread import time def sleep_boy(name): time.sleep(1) print(‘%s is sleeping‘ %name) t = Thread(target=sleep_boy,args=(‘xiaoming‘,)) # 这里可以不需要main,因为现在只是在一个进程内操作,不需要导入进程就不会import主进程了 t.start() print(‘主线程‘) 方式二、 from threading import Thread import time class Sleep_boy(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): time.sleep(1) print(‘%s is sleeping‘ % self.name) t = Sleep_boy(‘xiaoming‘) t.start() print(‘主线程‘) 六、协程 1、greenlet例子: import time from greenlet import greenlet def cooking(): print(‘cooking 1‘) g2.switch() # 切换到g2,让g2的函数工作 time.sleep(1) print(‘cooking 2‘) def watch(): print(‘watch TV 1‘) time.sleep(1) print(‘watch TV 2‘) g1.switch() # 切换到g1,让g1的函数工作 g1 = greenlet(cooking) g2 = greenlet(watch) g1.switch() # 切换到g1,让g1的函数工作 greenlet的缺陷:很显然greenlet实现了协程的切换功能,可以自己设置什么时候切,在哪切,但是它遇到阻塞并没有自动切换, 因此并不能提高效率。所以一般我们都使用gevent模块实现协程 2、gevent例子: from gevent import monkey monkey.patch_all() import time import gevent def cooking(): print(‘cooking 1‘) time.sleep(1) print(‘cooking 2‘) def watch(): print(‘watch TV 1‘) time.sleep(1) print(‘watch TV 2‘) g1 = gevent.spawn(cooking) # 自动检测阻塞事件,遇见阻塞了就会进行切换 g2 = gevent.spawn(watch) g1.join() # 阻塞直到g1结束 g2.join() # 阻塞直到g2结束 七、进程池 1、同步提交apply: import os import time from multiprocessing import Pool def test(num): time.sleep(1) print(‘%s:%s‘ %(num,os.getpid())) return num*2 if __name__ == ‘__main__‘: p = Pool() for i in range(20): res = p.apply(test,args=(i,)) # 提交任务的方法 同步提交 print(‘-->‘,res) # res就是test的return的值,同步提交的返回值可以直接使用 2、异步提交apply_async: 2-1无返回值: import time from multiprocessing import Pool def func(num): time.sleep(1) print(‘做了%s件衣服‘%num) if __name__ == ‘__main__‘: p = Pool(4) # 进程池中创建4个进程,不写的话,默认值为你电脑的CUP数量 for i in range(50): p.apply_async(func,args=(i,)) # 异步提交func到一个子进程中执行,没有返回值的情况 p.close() # 关闭进程池,用户不能再向这个池中提交任务了 p.join() # 阻塞,直到进程池中所有的任务都被执行完 2-2有返回值: import time import os from multiprocessing import Pool def test(num): time.sleep(1) print(‘%s:%s‘ %(num,os.getpid())) return num*2 if __name__ == ‘__main__‘: p = Pool() res_lst = [] for i in range(20): res = p.apply_async(test,args=(i,)) # 提交任务的方法 异步提交 res_lst.append(res) for res in res_lst: print(res.get()) # 异步提交的返回值需要get,get有阻塞效果,此时就不需要close和join 2-3map: map接收一个函数和一个可迭代对象,是异步提交的简化版本,自带close和join方法 可迭代对象的每一个值就是函数接收的实参,可迭代对象的长度就是创建的任务数量 map可以直接拿到返回值的可迭代对象(列表),循环就可以获取返回值 import time from multiprocessing import Pool def func(num): print(‘子进程:‘,num) # time.sleep(1) return num if __name__ == ‘__main__‘: p = Pool() ret = p.map(func,range(10)) # ret是列表 for i in ret: print(‘返回值:‘,i) 2-4回调函数: import os from multiprocessing import Pool def func(i): print(‘子进程:‘,os.getpid()) return i def call_back(res): print(‘回调函数:‘,os.getpid()) print(‘res--->‘,res) if __name__ == ‘__main__‘: p = Pool() print(‘主进程:‘,os.getpid()) p.apply_async(func,args=(1,),callback=call_back) # callback关键字传参,参数是回调函数 p.close() p.join() 八、进程池、线程池 线程池: 1、 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(‘thread‘,i) time.sleep(1) print(‘thread %s end‘%i) tp = ThreadPoolExecutor(5) # 相当于tp = Pool(5) tp.submit(func,1) # 相当于tp.apply_async(func,args=(1,)) tp.shutdown() # 相当于tp.close() + tp.join() print(‘主线程‘) 2、 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘%i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() # shutdown一次就够了,会自动把所有的线程都join() print(‘主线程‘) 3、返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘ %i) return i * ‘*‘ tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): ret = tp.submit(func,i) ret_lst.append(ret) for ret in ret_lst: print(ret.result()) # 相当于ret.get() print(‘主线程‘) 4、map map接收一个函数和一个可迭代对象 可迭代对象的每一个值就是函数接收的实参,可迭代对象的长度就是创建的线程数量 map可以直接拿到返回值的可迭代对象(列表),循环就可以获取返回值 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(‘thread‘,i) time.sleep(1) print(‘thread %s end‘%i) return i * ‘*‘ tp = ThreadPoolExecutor(5) ret = tp.map(func,range(20)) for i in ret: print(i) 5、回调函数 回调函数在进程池是由主进程实现的 回调函数在线程池是由子线程实现的 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘%i) return i * ‘*‘ def call_back(arg): print(‘call back : ‘,currentThread().ident) print(‘ret : ‘,arg.result()) # multiprocessing的Pool回调函数中的参数不需要get(),这里需要result() tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): tp.submit(func,i).add_done_callback(call_back) # 使用add_done_callback()方法实现回调函数 print(‘主线程‘,currentThread().ident)
原文地址:https://www.cnblogs.com/Zzbj/p/9713675.html
时间: 2024-11-10 03:59:21