简单并发实例
服务端:
#!/usr/bin/env python3 #-*- coding:utf-8 -*- ‘‘‘ Administrator 2018/8/3 ‘‘‘ import socketserver class MyServer(socketserver.BaseRequestHandler): #定义自己的类,继承固定的类 socketserver.BaseRequestHandler def handle(self): #方法名 必须用handle 为了重新父类的方法 print ("服务端启动...") while True: conn = self.request #相当于:conn, addr = s.accept() print (self.client_address) while True: client_data=conn.recv(1024) print (str(client_data,"utf8")) print ("waiting...") conn.sendall(client_data) conn.close() if __name__ == ‘__main__‘: #调用socketserver 下的多线程TCPSERVer服务方法。需要两个参数,(ip port), 自己定义的类 server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8091),MyServer) #调用serve_forever()方法。里面封装了 server.serve_forever()
客户端:
#!/usr/bin/env python3 #-*- coding:utf-8 -*- ‘‘‘ Administrator 2018/8/3 ‘‘‘ #-----------------------------------------------------client.py #----------------------------------------------------- import socket ip_port = (‘127.0.0.1‘,8091) sk = socket.socket() sk.connect(ip_port) print ("客户端启动:") while True: inp = input(‘>>>‘) sk.sendall(bytes(inp,"utf8")) if inp == ‘exit‘: break server_response=sk.recv(1024) print (str(server_response,"utf8")) sk.close()
聊天并发实例
1 #!/usr/bin/env python3 2 #-*- coding:utf-8 -*- 3 ‘‘‘ 4 Administrator 5 2018/8/3 6 ‘‘‘ 7 import socketserver 8 9 class MyServer(socketserver.BaseRequestHandler): 10 11 def handle(self): 12 print ("服务端启动...") 13 while True: 14 conn = self.request 15 print (self.client_address) 16 while True: 17 18 client_data=conn.recv(1024) 19 20 print (str(client_data,"utf8")) 21 print ("waiting...") 22 server_response=input(">>>") 23 conn.sendall(bytes(server_response,"utf8")) 24 # conn.sendall(client_data) 25 26 conn.close() 27 # print self.request,self.client_address,self.server 28 29 30 if __name__ == ‘__main__‘: 31 server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8098),MyServer) 32 server.serve_forever()
kehuduan:
1 ########################################## 2 import socket 3 4 5 ip_port = (‘127.0.0.1‘,8098) 6 sk = socket.socket() 7 sk.connect(ip_port) 8 print ("客户端启动:") 9 while True: 10 inp = input(‘>>>‘) 11 sk.sendall(bytes(inp,"utf8")) 12 server_response=sk.recv(1024) 13 print (str(server_response,"utf8")) 14 if inp == ‘exit‘: 15 break 16 sk.close()
其它应用
命令传送1:
#------------------------------------------------server #------------------------------------------------ import socket import subprocess ip_port = (‘127.0.0.1‘,8879) sk = socket.socket() sk.bind(ip_port) sk.listen(5) print ("服务端启动...") while True: conn,address = sk.accept() while True: try: client_data=conn.recv(1024) except Exception: break print (str(client_data,"utf8")) print ("waiting...") # server_response=input(">>>") # conn.sendall(bytes(server_response,"utf8")) cmd=str(client_data,"utf8").strip() cmd_call=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) cmd_result=cmd_call.stdout.read() if len(cmd_result)==0: cmd_result=b"no output!" conn.sendall(cmd_result) print(‘send data size‘,len(cmd_result)) print(‘******************‘) print(‘******************‘) print(‘******************‘) conn.close() #------------------------------------------------client #------------------------------------------------ import socket ip_port = (‘127.0.0.1‘,8879) sk = socket.socket() sk.connect(ip_port) print ("客户端启动:") while True: inp = input(‘cdm:>>>‘).strip( ) if len(inp)==0: continue if inp=="q": break sk.sendall(bytes(inp,"utf8")) server_response=sk.recv(1024) print (str(server_response,"gbk")) print(‘receive data size‘,len(server_response)) if inp == ‘exit‘: break sk.close()
1 2 |
|
conclusion:
sendall会把数据直接全部发送到客户端,客户端将所有的数据都放到缓冲区,每次recv多少字节取决于recv内的参数,理论不应该超过8k。
所以,并不能一次recv()无限大数据,所以这里我们应该通过循环去接收。
1 |
|
命令传送2:解决大数据传送和粘包问题
import socketserver import subprocess class Myserver(socketserver.BaseRequestHandler): def handle(self): while True: conn=self.request conn.sendall(bytes("欢迎登录","utf8")) while True: client_bytes=conn.recv(1024) if not client_bytes:break client_str=str(client_bytes,"utf8") print(client_str) command=client_str result_str=subprocess.getoutput(command) result_bytes = bytes(result_str,encoding=‘utf8‘) info_str="info|%d"%len(result_bytes) conn.sendall(bytes(info_str,"utf8")) # conn.recv(1024) conn.sendall(result_bytes) conn.close() if __name__=="__main__": server=socketserver.ThreadingTCPServer(("127.0.0.1",9998),Myserver) server.serve_forever() #####################################client import socket ip_port=("127.0.0.1",9998) sk=socket.socket() sk.connect(ip_port) print("客户端启动...") print(str(sk.recv(1024),"utf8")) while True: inp=input("please input:").strip() sk.sendall(bytes(inp,"utf8")) basic_info_bytes=sk.recv(1024) print(str(basic_info_bytes,"utf8")) # sk.send(bytes(‘ok‘,‘utf8‘)) result_length=int(str(basic_info_bytes,"utf8").split("|")[1]) print(result_length) has_received=0 content_bytes=bytes() while has_received<result_length: fetch_bytes=sk.recv(1024) has_received+=len(fetch_bytes) content_bytes+=fetch_bytes cmd_result=str(content_bytes,"utf8") print(cmd_result) sk.close()
import socket,os ip_port=("127.0.0.1",8898) sk=socket.socket() sk.bind(ip_port) sk.listen(5) BASE_DIR=os.path.dirname(os.path.abspath(__file__)) while True: print("waiting connect") conn,addr=sk.accept() flag = True while flag: client_bytes=conn.recv(1024) client_str=str(client_bytes,"utf8") func,file_byte_size,filename=client_str.split("|",2) path=os.path.join(BASE_DIR,‘yuan‘,filename) has_received=0 file_byte_size=int(file_byte_size) f=open(path,"wb") while has_received<file_byte_size: data=conn.recv(1024) f.write(data) has_received+=len(data) print("ending") f.close() #----------------------------------------------client #---------------------------------------------- import socket import re,os,sys ip_port=("127.0.0.1",8898) sk=socket.socket() sk.connect(ip_port) BASE_DIR=os.path.dirname(os.path.abspath(__file__)) print("客户端启动....") while True: inp=input("please input:") if inp.startswith("post"): method,local_path=inp.split("|",1) local_path=os.path.join(BASE_DIR,local_path) file_byte_size=os.stat(local_path).st_size file_name=os.path.basename(local_path) post_info="post|%s|%s"%(file_byte_size,file_name) sk.sendall(bytes(post_info,"utf8")) has_sent=0 file_obj=open(local_path,"rb") while has_sent<file_byte_size: data=file_obj.read(1024) sk.sendall(data) has_sent+=len(data) file_obj.close() print("上传成功")
文件上传
注意:
1 纸条就是conn
2 一收一发
3 client_data=conn.recv(1024)
if 那边send一个空数据 这边recv为空,则recv继续阻塞,等待其他的数据。所以聊天的时候好好聊,别发空数据。
socketserver
虽说用Python编写简单的网络程序很方便,但复杂一点的网络程序还是用现成的框架比较好。这样就可以专心事务逻辑,而不是套接字的各种细节。SocketServer模块简化了编写网络服务程序的任务。同时SocketServer模块也是Python标准库中很多服务器框架的基础。
socketserver模块可以简化网络服务器的编写,Python把网络服务抽象成两个主要的类,一个是Server类,用于处理连接相关的网络操作,另外一个则是RequestHandler类,用于处理数据相关的操作。并且提供两个MixIn 类,用于扩展 Server,实现多进程或多线程。
Server类
它包含了种五种server类,BaseServer(不直接对外服务)。TCPServer使用TCP协议,UDPServer使用UDP协议,还有两个不常使用的,即UnixStreamServer和UnixDatagramServer,这两个类仅仅在unix环境下有用(AF_unix)。
Base class for server classes.
1 |
|
This uses the Internet TCP protocol, which provides for continuous streams of data between the client and server.
1 |
|
This uses datagrams, which are discrete packets of information that may arrive out of order or be lost while in transit. The parameters are the same as for TCPServer
1 |
|
These more infrequently used classes are similar to the TCP and UDP classes, but use Unix domain sockets; they’re not available on non-Unix platforms. The parameters are the same as for TCPServer.
1 2 |
|
class UnixStreamServer(TCPServer): address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer): address_family = socket.AF_UNIX
BaseServer的源码:
class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - service_actions() - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() def service_actions(self): """Called by the serve_forever() loop. May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop. """ pass # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except OSError: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print(‘-‘*40) print(‘Exception happened during processing of request from‘, end=‘ ‘) print(client_address) import traceback traceback.print_exc() # XXX But this goes to stderr! print(‘-‘*40)
There are five classes in an inheritance diagram, four of which represent synchronous servers of four types:
There are five classes in an inheritance diagram, four of which represent synchronous servers of four types: +------------+ | BaseServer | +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamServer | +-----------+ +------------------+ | v +-----------+ +--------------------+ | UDPServer |------->| UnixDatagramServer | +-----------+ +--------------------+
RequestHandler类
所有requestHandler都继承BaseRequestHandler基类。
class BaseRequestHandler: """Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define arbitrary other instance variariables. """ def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): pass 源码
创建一个socketserver 至少分以下几步
- First, you must create a request handler class by subclassing the
BaseRequestHandler
class and overriding itshandle()
method; this method will process incoming requests. - Second, you must instantiate one of the server classes, passing it the server’s address and the request handler class.
- Then call the
handle_request()
orserve_forever()
method of the server object to process one or many requests. - Finally, call
server_close()
to close the socket.
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): """ The request handler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) # just send back the same data, but upper-cased self.request.sendall(self.data.upper()) if __name__ == "__main__": HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = socketserver.TCPServer((HOST, PORT), MyTCPHandler) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
让你的socketserver并发起来, 必须选择使用以下一个多并发的类
12
3
4
5
6
7
class
socketserver.ForkingTCPServer
class
socketserver.ForkingUDPServer
class
socketserver.ThreadingTCPServer
class
socketserver.ThreadingUDPServer
所以:
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler) #替换为 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)
思考:
1 tcp与udp的区别(三次握手后,建立连接,双向通道,一个收,一个发,tcp每次接到数据后都会有一个应答,有了应答,新的数据就会被覆盖掉)
2 粘包
原文地址:https://www.cnblogs.com/Mengchangxin/p/9413284.html