1 #!/usr/bin/env python 2 # coding: utf-8 3 4 from daemon import Daemon 5 import socket 6 import select 7 import time 8 import pdb 9 10 __all__ = ["nbNet", "sendData_mh"] 11 #DEBUG = True 12 13 from nbNetUtils import * 14 15 class nbNetBase: 16 ‘‘‘non-blocking Net‘‘‘ 17 def setFd(self, sock): 18 """sock is class object of socket""" 19 #dbgPrint("\n -- setFd start!") 20 tmp_state = STATE() 21 tmp_state.sock_obj = sock 22 self.conn_state[sock.fileno()] = tmp_state 23 #self.conn_state[sock.fileno()].printState() 24 #dbgPrint("\n -- setFd end!") 25 26 def accept(self, fd): 27 """fd is fileno() of socket""" 28 #dbgPrint("\n -- accept start!") 29 sock_state = self.conn_state[fd] 30 sock = sock_state.sock_obj 31 conn, addr = sock.accept() 32 # set to non-blocking: 0 33 conn.setblocking(0) 34 return conn 35 36 def close(self, fd): 37 """fd is fileno() of socket""" 38 #pdb.set_trace() 39 print "closing", fd, self.conn_state 40 try: 41 # cancel of listen to event 42 sock = self.conn_state[fd].sock_obj 43 self.epoll_sock.unregister(fd) 44 sock.close() 45 self.conn_state.pop(fd) 46 tmp_pipe = self.popen_pipe 47 self.popen_pipe = 0 48 tmp_pipe.close() 49 except: 50 #dbgPrint("Close fd: %s abnormal" % fd) 51 pass 52 #@profile 53 def read(self, fd): 54 """fd is fileno() of socket""" 55 #pdb.set_trace() 56 try: 57 sock_state = self.conn_state[fd] 58 conn = sock_state.sock_obj 59 if sock_state.need_read <= 0: 60 raise socket.error 61 62 one_read = conn.recv(sock_state.need_read) 63 #dbgPrint("\tread func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read)) 64 if len(one_read) == 0: 65 raise socket.error 66 # process received data 67 sock_state.buff_read += one_read 68 sock_state.have_read += len(one_read) 69 sock_state.need_read -= len(one_read) 70 #sock_state.printState() 71 72 # read protocol header 73 if sock_state.have_read == 10: 74 header_said_need_read = int(sock_state.buff_read) 75 if header_said_need_read <= 0: 76 raise socket.error 77 sock_state.need_read += header_said_need_read 78 sock_state.buff_read = ‘‘ 79 # call state machine, current state is read. 80 # after protocol header haven readed, read the real cmd content, 81 # call machine instead of call read() it self in common. 82 #sock_state.printState() 83 return "readcontent" 84 elif sock_state.need_read == 0: 85 # recv complete, change state to process it 86 return "process" 87 else: 88 return "readmore" 89 except (socket.error, ValueError), msg: 90 try: 91 if msg.errno == 11: 92 #dbgPrint("11 " + msg) 93 return "retry" 94 except: 95 pass 96 return ‘closing‘ 97 98 99 #@profile 100 def write(self, fd): 101 sock_state = self.conn_state[fd] 102 conn = sock_state.sock_obj 103 #pdb.set_trace() 104 105 if isinstance(sock_state.popen_pipe, file): 106 try: 107 output = sock_state.popen_pipe.read() 108 #print output 109 except (IOError, ValueError), msg: 110 pass 111 #have_send = conn.send("%010d%s" % (len(output), output)) 112 #todo 113 114 else: 115 last_have_send = sock_state.have_write 116 try: 117 # to send some Bytes, but have_send is the return num of .send() 118 have_send = conn.send(sock_state.buff_write[last_have_send:]) 119 sock_state.have_write += have_send 120 sock_state.need_write -= have_send 121 if sock_state.need_write == 0 and sock_state.have_write != 0: 122 # send complete, re init status, and listen re-read 123 #sock_state.printState() 124 #dbgPrint(‘\n write data completed!‘) 125 return "writecomplete" 126 else: 127 return "writemore" 128 except socket.error, msg: 129 return "closing" 130 131 132 def run(self): 133 while True: 134 #dbgPrint("\nrun func loop:") 135 # print conn_state 136 #for i in self.conn_state.iterkeys(): 137 #dbgPrint("\n - state of fd: %d" % i) 138 #self.conn_state[i].printState() 139 140 epoll_list = self.epoll_sock.poll() 141 for fd, events in epoll_list: 142 #dbgPrint(‘\n-- run epoll return fd: %d. event: %s‘ % (fd, events)) 143 print self.conn_state 144 print fd, events 145 sock_state = self.conn_state[fd] 146 if select.EPOLLHUP & events: 147 #dbgPrint("EPOLLHUP") 148 sock_state.state = "closing" 149 elif select.EPOLLERR & events: 150 #dbgPrint("EPOLLERR") 151 sock_state.state = "closing" 152 self.state_machine(fd) 153 154 def state_machine(self, fd): 155 #time.sleep(0.1) 156 #dbgPrint("\n - state machine: fd: %d, status: %s" % (fd, self.conn_state[fd].state)) 157 sock_state = self.conn_state[fd] 158 self.sm[sock_state.state](fd) 159 160 class nbNet(nbNetBase): 161 def __init__(self, addr, port, logic): 162 #dbgPrint(‘\n__init__: start!‘) 163 self.conn_state = {} 164 self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 165 self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 166 self.listen_sock.bind((addr, port)) 167 self.listen_sock.listen(10) 168 self.setFd(self.listen_sock) 169 self.epoll_sock = select.epoll() 170 # LT for default, ET add ‘ | select.EPOLLET ‘ 171 self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN ) 172 self.logic = logic 173 self.sm = { 174 "accept" : self.accept2read, 175 "read" : self.read2process, 176 "write" : self.write2read, 177 "process": self.process, 178 "closing": self.close, 179 } 180 #dbgPrint(‘\n__init__: end, register no: %s‘ % self.listen_sock.fileno() ) 181 182 #@profile 183 def process(self, fd): 184 sock_state = self.conn_state[fd] 185 response = self.logic(fd, sock_state.buff_read) 186 #pdb.set_trace() 187 if response == None: 188 conn = sock_state.sock_obj 189 self.setFd(conn) 190 self.conn_state[fd].state = "read" 191 self.epoll_sock.modify(fd, select.EPOLLIN) 192 else: 193 sock_state.buff_write = "%010d%s" % (len(response), response) 194 sock_state.need_write = len(sock_state.buff_write) 195 #sock_state.printState() 196 #self.state_machine(fd) 197 sock_state.state = "write" 198 self.epoll_sock.modify(fd, select.EPOLLOUT) 199 200 201 202 #@profile 203 def accept2read(self, fd): 204 conn = self.accept(fd) 205 self.epoll_sock.register(conn.fileno(), select.EPOLLIN) 206 # new client connection fd be initilized 207 self.setFd(conn) 208 self.conn_state[conn.fileno()].state = "read" 209 # now end of accept, but the main process still on ‘accept‘ status 210 # waiting for new client to connect it. 211 #dbgPrint("\n -- accept end!") 212 213 #@profile 214 def read2process(self, fd): 215 """fd is fileno() of socket""" 216 #pdb.set_trace() 217 read_ret = "" 218 try: 219 read_ret = self.read(fd) 220 except (Exception), msg: 221 #dbgPrint(msg) 222 read_ret = "closing" 223 if read_ret == "process": 224 # recv complete, change state to process it 225 #sock_state.state = "process" 226 self.process(fd) 227 elif read_ret == "readcontent": 228 pass 229 elif read_ret == "readmore": 230 pass 231 elif read_ret == "retry": 232 pass 233 elif read_ret == "closing": 234 self.conn_state[fd].state = ‘closing‘ 235 # closing directly when error. 236 self.state_machine(fd) 237 else: 238 raise Exception("impossible state returned by self.read") 239 240 #@profile 241 def write2read(self, fd): 242 try: 243 write_ret = self.write(fd) 244 except socket.error, msg: 245 write_ret = "closing" 246 247 if write_ret == "writemore": 248 pass 249 elif write_ret == "writecomplete": 250 sock_state = self.conn_state[fd] 251 conn = sock_state.sock_obj 252 self.setFd(conn) 253 self.conn_state[fd].state = "read" 254 self.epoll_sock.modify(fd, select.EPOLLIN) 255 elif write_ret == "closing": 256 #dbgPrint(msg) 257 self.conn_state[fd].state = ‘closing‘ 258 # closing directly when error. 259 self.state_machine(fd) 260 261 counter = 0 262 if __name__ == ‘__main__‘: 263 264 def logic(d_in): 265 global counter 266 counter += 1 267 if counter % 100000 == 0: 268 print counter, time.time() 269 return("a") 270 271 reverseD = nbNet(‘0.0.0.0‘, 9099, logic) 272 reverseD.run()
时间: 2024-11-06 14:35:26