1 基于UDP套接字
1.1 介绍
udp是无连接的,是数据报协议,先启动哪段都不会报错
udp服务端
import socket sk = socket() #创建一个服务器的套接字 sk.bind() #绑定服务器套接字 while True: #服务器无限循环 cs = sk.recvfrom()/sk.sendto() # 对话(接收与发送) sk.close() # 关闭服务器套接字
udp客户端
import socket client = socket() # 创建客户套接字 while True: # 通讯循环 client.sendto()/client.recvfrom() # 对话(发送/接收) client.close() # 关闭客户套接字
1.2 基本实例
1.2.1 服务端
import socket udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_server.bind((‘127.0.0.1‘, 9999)) while True: data,client_addr = udp_server.recvfrom(512) print(data, client_addr) udp_server.sendto(data.upper(), client_addr)
1.2.2 客户端
import socket udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) while True: msg = input(‘>>‘).strip() udp_client.sendto(msg.encode(‘utf-8‘), (‘127.0.0.1‘,9999)) data,server_addr = udp_client.recvfrom(512) print(data.decode(‘utf-8‘))
1.3 udp不会粘包
udp是基于数据报协议,发送一份信息,有完整的报头的主题,不会像tcp那样基于数据流的,没有开头、没有结尾;而udp有开头(报头),也有结尾,所以不会出现像tcp那样粘包现象。
1.3.1 服务端
import socket udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_server.bind((‘127.0.0.1‘,9999)) info1,client_addr = udp_server.recvfrom(1) print(‘info1‘, info1) info2,client_addr = udp_server.recvfrom(512) print(‘info2‘, info2)
1.3.2 客户端
import socket udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_client.sendto(‘welcome‘.encode(‘utf-8‘), (‘127.0.0.1‘,9999)) udp_client.sendto(‘beijing‘.encode(‘utf-8‘), (‘127.0.0.1‘,9999))
1.4 udp并发
1.4.1 服务端
import socketserver class MyUDPhandler(socketserver.BaseRequestHandler): def handle(self): print(self.request) self.request[1].sendto(self.request[0].upper(), self.client_address) if __name__ == ‘__main__‘: udp_server = socketserver.ThreadingUDPServer((‘127.0.0.1‘,8080), MyUDPhandler) udp_server.serve_forever()
1.4.2 客户端
import socket udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) while True: info = input(‘>>‘).strip() udp_client.sendto(info.encode(‘utf-8‘), (‘127.0.0.1‘,9999)) body,server_addr = udp_client.recvfrom(512) print(body.decode(‘utf-8‘))
2 进程
2.1 介绍
进程:正在运行的一个过程或者任务,是对正在运行程序的一个抽象。
2.2 开启进程
示例1
from multiprocessing import Process import time def my_run(info): print(‘task <%s> is running‘ %info) time.sleep(0.5) print(‘task <%s> is done‘ % info) if __name__ == ‘__main__‘: process1 = Process(target = my_run, args=(‘mary‘,)) process2 = Process(target = my_run, args=(‘jack‘,)) process1.start() process2.start()
示例2
from multiprocessing import Process import time class MyMulProcess(Process): def __init__(self,name): super().__init__() self.name = name def my_run(self): print(‘task <%s> is runing‘ % self.name) time.sleep(0.5) print(‘task <%s> is done‘ % self.name) if __name__ == ‘__main__‘: process = MyMulProcess(‘jack‘) process.my_run() process.start()
2.3 并发通信
2.3.1 服务端
from multiprocessing import Pool import os import socket tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 解决Address already in use tcp_server.bind((‘127.0.0.1‘,9999)) tcp_server.listen(5) def work(conn, addr): print(os.getpid()) print(addr) while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() if __name__ == ‘__main__‘: pool = Pool(4) while True: conn,addr = tcp_server.accept() pool.apply_async(work, args = (conn, addr)) tcp_server.close()
2.3.2 客户端
import socket tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_client.connect((‘127.0.0.1‘,9999)) while True: info = input(‘>>‘).strip() if not info:continue tcp_client.send(info.encode(‘utf-8‘)) data = tcp_client.recv(1024) print(data.decode(‘utf-8‘)) tcp_client.close()
2.4 join方法
主进程,等待子进程运行完,才执行下面内容;
p.join只能join住start开启的进程,而不能join住run开启的进程
主进程,等待p1执行结束,才执行主进程下面的内容
from multiprocessing import Process import time def work(name): print(‘task <%s> is runing‘ %name) time.sleep(0.5) print(‘task <%s> is done‘ % name) if __name__ == ‘__main__‘: process1 = Process(target = work, args=(‘jack‘,)) process2 = Process(target = work, args=(‘mary‘,)) process_list = [process1, process2] for process in process_list: process.start() for process in process_list: process.join()
2.6 守护进程
主进程代码运行完毕,守护进程就会结束
主进程创建守护进程
1.守护进程会在主进程代码执行结束后就终止
2.守护进程内无法再开启子进程,否则抛出异常。
守护进程,守护者主进程,主进程结束,守护进程随即结束;主进程代码结束后,守护进程随之结束
from multiprocessing import Process import time def work(name): print(‘task <%s> is runing‘ %name) time.sleep(0.5) print(‘task <%s> is done‘ % name) if __name__ == ‘__main__‘: p1=Process(target=work,args=(‘jack‘,)) p1.daemon = True # 必须在进程开启之前,设置为守护进程 p1.start()
重复守护进程概念,守护进程什么时间结束;在主进程代码结束,就会结束
from multiprocessing import Process import time def foo(): print("from foo start") time.sleep(0.5) print("from foo end") def bar(): print("from bar start") time.sleep(0.8) print("from bar end") if __name__ == ‘__main__‘: process1 = Process(target = foo) process2 = Process(target = bar) process1.daemon = True process1.start() process2.start() print("主进程") #打印该行则主进程代码结束,则守护进程process1应该被终止, # 可能会有process1任务执行的打印信息from foo start, # 因为主进程打印主进程时,process1也执行了,但是随即被终止
2.7 进程同步锁
核心点:保证一个进程用完一个终端,再交个另一个终端使用,独享终端,保证有序;
2.7.1 基本用法
加锁,变为串行,保证数据不会错乱;效率与错乱之间做出取舍
from multiprocessing import Process,Lock import time def work(name, mutex): mutex.acquire() print(‘task <%s> is runing‘ %name) time.sleep(0.5) print(‘task <%s> is done‘ % name) mutex.release() if __name__ == ‘__main__‘: mutex = Lock() process1 = Process(target = work, args = (‘jack‘, mutex)) process2 = Process(target = work, args = (‘mary‘, mutex)) process1.start() process2.start()
2.7.2 模拟购票
模拟购票,查询票的余额,不要考虑先后顺序;而到真正购票环境,需要保证一张票不被多次购买,需要加锁。
import json,time,os from multiprocessing import Process,Lock def search(): dic = json.load(open(‘ticket.txt‘)) print(‘\033[32m[%s] 看到剩余票数<%s>\033[0m‘ %(os.getpid(), dic[‘count‘])) def get_ticket(): dic = json.load(open(‘ticket.txt‘)) time.sleep(0.5) # 模拟读数据库的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.5) # 模拟写数据库的网络延迟 json.dump(dic,open(‘ticket.txt‘,‘w‘)) print(‘\033[31m%s 购票成功\033[0m‘ %os.getpid()) def work(mutex): search() mutex.acquire() get_ticket() mutex.release() if __name__ == ‘__main__‘: mutex = Lock() for index in range(10): process = Process(target = work, args = (mutex,)) process.start()
2.7.3 共享数据通信
基于共享内存方式,进行数据通信,需要考虑锁的形式。
from multiprocessing import Process,Manager,Lock def work(dic, mutex): with mutex: dic[‘count‘] -= 1 if __name__ == ‘__main__‘: mutex = Lock() manager = Manager() dic = manager.dict({‘count‘:100}) p_list = [] for i in range(100): process = Process(target = work, args = (dic, mutex)) p_list.append(process) process.start() for process in p_list: process.join() print(dic)
2.7.3 进程间通信
进程间的通信,有很多方式,例如:管道、共享数据、消息队列等;推荐的方式是:通过消息队列的方式进行通信。
Queue,常用方法:put、get;队列就是管道加锁,进行实现的
from multiprocessing import Queue queue = Queue(3) # 队列的最大长度为3 queue.put(‘first‘) queue.put(‘second‘) queue.put(‘third‘) queue.put(‘fourth‘) # 超过队列长度,满了会卡着 print(queue.get()) print(queue.get()) print(queue.get()) print(queue.get()) # 队列空了,一直卡着,等待队里有值,进行获取 # 了解知识点 queue = Queue(3) queue.put(‘first‘,block = False) # 队列满了,不进行锁住,会抛异常 queue.put(‘second‘, block = False) queue.put(‘third‘, block = False) queue.put_nowait(‘fourth‘) # 等价queue.put(‘fourth‘, block = False) queue.put(‘fourth‘,timeout = 3) # 默认等待3秒钟,指定超时时间
3 生产者、消费者
应该具有两类模型,生产者和消费者
3.1 基本版本的生产者消费者
from multiprocessing import Process,Queue import time,os def producer(q,name): for i in range(5): time.sleep(0.5) res=‘%s%s‘ %(name,i) q.put(res) print(‘\033[42m<%s> 制造 [%s]\033[0m‘ %(os.getpid(),res)) def consumer(q): while True: res=q.get() if res is None:break time.sleep(0.8) print(‘\033[31m<%s> 购买 [%s]\033[0m‘ % (os.getpid(), res)) if __name__ == ‘__main__‘: queue = Queue() # 生产者 producer1 = Process(target = producer, args = (queue, ‘自行车‘)) producer2 = Process(target = producer, args = (queue, ‘汽车‘)) producer3 = Process(target = producer, args = (queue, ‘飞机‘)) # 消费者 consumer1 = Process(target = consumer, args = (queue,)) consumer2 = Process(target = consumer, args = (queue,)) producer1.start() producer2.start() producer3.start() consumer1.start() consumer2.start() producer1.join() producer2.join() producer3.join() queue.put(None) # 利用None通知消费者,东西已经生产完了 queue.put(None) # 有几个消费者,就要通知几次
3.2 JoinableQueue改进生产者消费者模型
生产者等待消费者把队列的内容全部去空,就结束生产过程;消费等待主进程结束,也就随之结束
主进程等待生产者结束,才执行剩余代码;需要消费者利用守护进程,随者主进程结束也就结束。
逻辑性比较强,利用JoinableQueue和守护进程和join。
from multiprocessing import Process,JoinableQueue import time,os def producer(queue, name): for i in range(5): time.sleep(0.5) res = ‘%s%s‘ %(name,i) queue.put(res) print(‘\033[42m<%s> 制造 [%s]\033[0m‘ %(os.getpid(),res)) queue.join() # 生产者等待queue里面的所有内容都被卖掉了,就结束了 def consumer(queue): while True: res = queue.get() if res is None:break time.sleep(0.8) print(‘\033[31m<%s> 购买 [%s]\033[0m‘ % (os.getpid(), res)) queue.task_done() # 确定购买了生产者的一个内容,通知生产者 # 通知queue,已经取走一个东西 if __name__ == ‘__main__‘: queue = JoinableQueue() # 生产者 producer1 = Process(target = producer, args = (queue, ‘自行车‘)) producer2 = Process(target = producer, args = (queue, ‘汽车‘)) producer3 = Process(target = producer, args = (queue, ‘飞机‘)) # 消费者 consumer1 = Process(target = consumer, args = (queue,)) consumer2 = Process(target = consumer, args = (queue,)) consumer1.daemon = True # 消费者利用守护进程,随着主进程结束也就结束 consumer2.daemon = True producer1.start() producer2.start() producer3.start() consumer1.start() consumer2.start() producer1.join()
4 进程池
默认开启进程池的个数,是cpu核数的个数
4.1 同步提交任务
同步提交任务,损失效率,保证数据有序和安全
from multiprocessing import Pool import os,time def task(n): print(‘task <%s> is runing‘ %os.getpid()) time.sleep(0.5) return n**3 if __name__ == ‘__main__‘: print(os.cpu_count()) p = Pool(4) for index in range(10): res = p.apply(task, args = (index,)) print(res)
4.2 异步提交任务
等待进程池中的所有任务结束,就可以拿到运行结果
p.close(),禁止向进程池中提交新任务
from multiprocessing import Pool import os,time def task(n): print(‘task <%s> is runing‘ %os.getpid()) time.sleep(0.5) return n**3 if __name__ == ‘__main__‘: print(os.cpu_count()) pool = Pool(4) res_list = [] for index in range(10): res = pool.apply_async(task, args = (index,)) # 只负责向队列仍任务,不等任务结束 res_list.append(res) for res in res_list: print(res.get()) pool.close() pool.join()
4.3 进程池控制并发
利用进程池,控制并发的进程数量
4.3.1 服务端
from multiprocessing import Pool import os import socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((‘127.0.0.1‘,9999)) server.listen(5) def work(conn, addr): print(os.getpid()) print(addr) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() if __name__ == ‘__main__‘: pool = Pool(4) while True: conn,addr = server.accept() pool.apply_async(work, args = (conn, addr)) server.close()
4.3.2 客户端
import socket client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((‘127.0.0.1‘,9999)) while True: info = input(‘>>‘).strip() if not info:continue client.send(info.encode(‘utf-8‘)) data = client.recv(1024) print(data.decode(‘utf-8‘)) client.close()
4.4 回调函数
让下载的函数进行并发,解析的函数进行串行执行(利用回调函数进行执行)
耗时时间长的利用进程池进行并发处理,利用异步进行处理
import requests # pip3 install requests import os,time from multiprocessing import Pool def get_info(url): print(‘<%s> get :%s‘ %(os.getpid(),url)) respone = requests.get(url) if respone.status_code == 200: return {‘url‘:url, ‘text‘:respone.text} def parse_page(dic): print(‘<%s> parse :%s‘ %(os.getpid(),dic[‘url‘])) time.sleep(0.5) parse_res=‘url:<%s> size:[%s]\n‘ %(dic[‘url‘],len(dic[‘text‘])) # 模拟解析网页内容 with open(‘reptile.txt‘,‘a‘) as f: f.write(parse_res) if __name__ == ‘__main__‘: pool = Pool(4) urls = [ ‘https://www.baidu.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://help.github.com‘, ‘https://www.sina.com.cn‘, ] for url in urls: pool.apply_async(get_info, args = (url,), callback = parse_page) # 利用回调函数,通知主进程,调用parse_page,需要执行parse_page函数了 # 把get_page执行结果,传递给parse_page作为参数进行传递 pool.close() pool.join()
5 paramike模块
5.1 介绍
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。
5.2 基于密码连接
import paramiko # 创建SSH对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 ssh.connect(hostname=‘127.0.0.1‘, port=22, username=‘root‘, password=‘xxx‘) # 执行命令 stdin, stdout, stderr = ssh.exec_command(‘df‘) # 获取命令结果 result = stdout.read() print(result.decode(‘utf-8‘)) # 关闭连接 ssh.close()
5.3 基于秘钥链接
客户端文件名:id_rsa
服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)
import paramiko private_key = paramiko.RSAKey.from_private_key_file(‘id_rsa‘) # 创建SSH对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 ssh.connect(hostname=‘127.0.0.1‘, port=22, username=‘root‘, pkey=private_key) # 执行命令 stdin, stdout, stderr = ssh.exec_command(‘df -h‘) # 获取命令结果h result = stdout.read() print(result.decode(‘utf-8‘)) # 关闭连接 ssh.close()
import paramiko from io import StringIO key_str = """-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEAl6OGU27q3YV1LMdAZH02PWFtPJaxQ68zbWIBmgP0NfnVQz30 Bvhe+rZCFEQCwLoeld4jj7vA+Vk8nKPoBhX1WVvq9MfSF+YRXeu1+XAXvL1pH+Q9 6gJ1l4NmwHq8DVgUCx3TsAdK0wOqEIUDSOu+2/nECK1lmKN1Cf5Va20r+B0CDjkT RRA6C9MIb2WWwf4EiEWjBl+lnZ0vDXoB38oZQR71tdLsvrVumUDzyGvF79jWW3O6 9gc4F2ivoKVbXswyJfq1bVpH1N7aM6DzUiELv1mFsyIVveydMVzFIrDbUI8gCErx NhI6esUbhNwfd3NfBOTzEQxBd5V3/a6rkC5MpwIDAQABAoIBAQCD9cJHiRbKgAFg XmUjDfPNpqMxPtI0XJscbVWHejljX26/fYKHLk05ULJggG8E2PMU6KN5yaI9W/Lr PZgE88b3ZI4rRlkGgyhJ234Y+/ssPIjnP/DBXDKJD8izaBuOYT/QDLzTSwVKbL3q clZRdxY4yDpYcs0e7+BCOhqLyg2hdAYA8Z4VOOs4SQqRW6k9K+oXeNMhc4htozOf tVsSM3XkFZ4hpug34S89+FKEwZ00RTJEEkK8IjBfLJ5w+RfLFXth8hTVMbeLhcv+ RqDYdUwq/JAXCrri0687hCwi5J06xTrY7BwgoKJznxlpz6tiyEPNrnqZ1vAayWqS G/x+R/cBAoGBAMj49kKmEpKZesbbSD9KJvD1YnkIGwEheLvNZzRtx+2cwejHHWDZ F0i+KzDTt+7QZ2zb+ABOgK1sq8Xhfn40M90xqixbqp0UzaFyMFnmiB2iyZ56I3Fi Kqeerr+f3i/djewNhMJZZZhO/n+YxhCpQUBotepQ3tGA/G6vvkSSMe73AoGBAMEo i9LaSDyJxk51mW8OmgiIyJ1376LKu3sHlUkn+Ca5dm2/iYNIuN5dC0YTPjo1A5It jZTid5VBEb4OMOpKYygR4S9euAxv22Uxib1xGZLdJHKblwizdJnBazsqDQW6mPfN o/BADQl/+h3pPpOWoIxSxi07xYq+gAToW2tc6TPRAoGBAKhGHRwtJbvuGqlKjjHA Ct8S94LT0JifyBGnqNRzX0WLTal0nxqqax6TbGKTw5yIjzDM9dh74q5TIXismFdf qlV48j31+uNPueWGUQnVRv9ZgGvbZLXZNlHnQfZdC5MUdXLC1vhMFg7zhZCdAKqO rX4arsclM4xD7hlXuX580qZ9AoGALs5te43LnWfhdxfGM4Q9TT4gJxBuMGuiHMEM quqVloSwrw2P/BE+QxwW5Ec7eA1qrRx+x4pNYgyfiQeVUODvwED86WaxgMoGRzJG 53IluVH/SApuAfzCj5OwMWkSOMYr1TiutkQ/JIMvj9n6gPcqNnbEcSefyew5x3aq 2IxuMlECgYEAtVuORz9C7WnJIVW6HwNiS4OBs7becOgXDHAOw0hnu+3mxVm/NIkX yeGK7rP1KKbS4I3pbG+H0nWAQkfQtW6nQjU5nvoCTX8Yyk6ZNC0mhGNTqKRqpjI/ eXe8nUib71izC6g6kJY66+BTg2SCBsHUAqAB7L4gvFHGt8sq46TQ3jw= -----END RSA PRIVATE KEY-----""" private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) transport = paramiko.Transport((‘127.0.0.1‘, 22)) transport.connect(username=‘root‘, pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command(‘df‘) result = stdout.read() print(result.decode(‘utf-8‘)) transport.close() print(result)