1 # 进程/线程/协程 2 # IO:同步/异步/阻塞/非阻塞 3 # greenlet gevent 4 # 事件驱动与异步IO 5 # Select\Poll\Epoll异步IO 以及selectors模块 6 # Python队列/RabbitMQ队列 7 8 ############################################################################################## 9 1.什么是进程?进程和程序之间有什么区别? 10 进程:一个程序的执行实例称为进程; 11 每个进程都提供执行程序所需的资源。 12 进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程; 13 每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程; 14 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程; 15 程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。 16 在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行,大大提高了CPU的利用率 17 2.什么是线程? 18 进程的缺点有: 19 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。 20 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。 21 线程是操作系统能够进行运算调度的最小单位。 22 它被包含在进程之中,是进程中的实际运作单位。 23 一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务 24 线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息。 25 3.进程和线程之间的关系? 26 线程共享创建它的进程的地址空间;进程有自己的地址空间。(内存地址) 27 线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。 28 线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。 29 新线程很容易创建;新进程需要复制父进程。 30 线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。 31 对主线程的更改(取消、优先级更改等)可能会影响流程的其他线程的行为;对父进程的更改不会影响子进程。 32 4.python GIL全局解释器锁 33 无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行 34 http: // www.dabeaz.com / python / UnderstandingGIL.pdf 35 5.Python threading模块的使用 36 基本调用方式1 37 import threading 38 import time 39 40 41 def sayhi(num): # 定义每个线程要运行的函数 42 43 print("running on number:%s" % num) 44 45 time.sleep(3) 46 47 48 if __name__ == ‘__main__‘: 49 t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例 50 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例 51 52 t1.start() # 启动线程 53 t2.start() # 启动另一个线程 54 55 print(t1.getName()) # 获取线程名 56 print(t2.getName()) 57 基本调用方式2 58 import threading 59 import time 60 61 62 class MyThread(threading.Thread): 63 def __init__(self, num): 64 threading.Thread.__init__(self) 65 self.num = num 66 67 def run(self): # 定义每个线程要运行的函数 68 69 print("running on number:%s" % self.num) 70 71 time.sleep(3) 72 73 74 if __name__ == ‘__main__‘: 75 t1 = MyThread(1) 76 t2 = MyThread(2) 77 t1.start() 78 t2.start() 79 6.守护线程Daemon: 80 非守护进程线程退出,就可以将守护线程杀死。 81 # _*_coding:utf-8_*_ 82 83 import time 84 import threading 85 86 87 def run(n): 88 print(‘[%s]------running----\n‘ % n) 89 time.sleep(2) 90 print(‘--done--‘) 91 92 93 def main(): 94 for i in range(5): 95 t = threading.Thread(target=run, args=[i, ]) 96 t.start() 97 t.join(1) 98 print(‘starting thread‘, t.getName()) 99 100 101 m = threading.Thread(target=main, args=[]) 102 m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务 103 m.start() 104 m.join(timeout=2) 105 print("---main thread done----") 106 7.线程锁(互斥锁) 107 一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,可能会导致数据被同时修改而使得计算结果不准确(重复赋值) 108 import time 109 import threading 110 111 112 def addNum(): 113 global num # 在每个线程中都获取这个全局变量 114 print(‘--get num:‘, num) 115 time.sleep(1) 116 num -= 1 # 对此公共变量进行-1操作 117 118 119 num = 100 # 设定一个共享变量 120 thread_list = [] 121 for i in range(100): 122 t = threading.Thread(target=addNum) 123 t.start() 124 thread_list.append(t) 125 126 for t in thread_list: # 等待所有线程执行完毕 127 t.join() 128 129 print(‘final num:‘, num) 130 加上线程锁 131 import time 132 import threading 133 134 135 def addNum(): 136 global num # 在每个线程中都获取这个全局变量 137 print(‘--get num:‘, num) 138 time.sleep(1) 139 lock.acquire() # 修改数据前加锁 140 num -= 1 # 对此公共变量进行-1操作 141 lock.release() # 修改后释放 142 143 144 num = 100 # 设定一个共享变量 145 thread_list = [] 146 lock = threading.Lock() # 生成全局锁 147 for i in range(100): 148 t = threading.Thread(target=addNum) 149 t.start() 150 thread_list.append(t) 151 152 for t in thread_list: # 等待所有线程执行完毕 153 t.join() 154 155 print(‘final num:‘, num) 156 8.线程锁与GIL之间的关系? 157 加入GIL主要的原因是为了降低程序的开发的复杂度, 158 比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程, 159 每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序里的线程和 py解释器自己的线程是并发运行的, 160 假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻, 161 可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了, 162 为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动 163 9.递归锁(Rlock 不用递归锁而多重加lock锁会导致被锁住,程序卡死) 164 import threading, time 165 166 167 def run1(): 168 print("grab the first part data") 169 lock.acquire() 170 global num 171 num += 1 172 lock.release() 173 return num 174 175 176 def run2(): 177 print("grab the second part data") 178 lock.acquire() 179 global num2 180 num2 += 1 181 lock.release() 182 return num2 183 184 185 def run3(): 186 lock.acquire() 187 res = run1() 188 print(‘--------between run1 and run2-----‘) 189 res2 = run2() 190 lock.release() 191 print(res, res2) 192 193 194 if __name__ == ‘__main__‘: 195 196 num, num2 = 0, 0 197 lock = threading.RLock() #注意递归锁是Rlock 198 for i in range(10): 199 t = threading.Thread(target=run3) 200 t.start() 201 202 while threading.active_count() != 1: 203 print(threading.active_count()) 204 else: 205 print(‘----all threads done---‘) 206 print(num, num2) 207 10.Semaphore(信号量): 208 同时允许一定数量的线程更改数据 209 import threading, time 210 211 def run(n): 212 semaphore.acquire() 213 time.sleep(1) 214 print("run the thread: %s\n" % n) 215 semaphore.release() 216 217 if __name__ == ‘__main__‘: 218 219 num = 0 220 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 221 for i in range(20): 222 t = threading.Thread(target=run, args=(i,)) 223 t.start() 224 225 while threading.active_count() != 1: 226 pass # print threading.active_count() 227 else: 228 print(‘----all threads done---‘) 229 print(num) 230 11.Events事件:通过Event来实现两个或多个线程间的交互 231 event = threading.Event() 232 # a client thread can wait for the flag to be set 233 event.wait() 234 # a server thread can set or reset it 235 event.set() 236 event.clear() 237 一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则: 238 import threading,time 239 import random 240 def light(): 241 if not event.isSet(): 242 event.set() #wait就不阻塞 #绿灯状态 243 count = 0 244 while True: 245 if count < 10: 246 print(‘\033[42;1m--green light on---\033[0m‘) 247 elif count <13: 248 print(‘\033[43;1m--yellow light on---\033[0m‘) 249 elif count <20: 250 if event.isSet(): 251 event.clear() 252 print(‘\033[41;1m--red light on---\033[0m‘) 253 else: 254 count = 0 255 event.set() #打开绿灯 256 time.sleep(1) 257 count +=1 258 def car(n): 259 while 1: 260 time.sleep(random.randrange(10)) 261 if event.isSet(): #绿灯 262 print("car [%s] is running.." % n) 263 else: 264 print("car [%s] is waiting for the red light.." %n) 265 if __name__ == ‘__main__‘: 266 event = threading.Event() 267 Light = threading.Thread(target=light) 268 Light.start() 269 for i in range(3): 270 t = threading.Thread(target=car,args=(i,)) 271 t.start() 272 12.python queue队列(线程队列) 273 class queue.Queue(maxsize=0) #先入先出 274 class queue.LifoQueue(maxsize=0) #last in fisrt out 275 class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列 276 Queue.qsize() 277 Queue.empty() #return True if empty 278 Queue.full() # return True if full 279 Queue.put(item, block=True, timeout=None) #将项目放入队列中。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到空闲槽可用。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有空闲槽,则会引发完全的异常。否则(块是false),如果一个空闲槽立即可用,则在队列上放置一个项,否则就会抛出完全异常(在这种情况下会忽略超时)。 280 Queue.put_nowait(item) #Equivalent to put(item, False). 281 Queue.get(block=True, timeout=None) #从队列中删除并返回一个项目。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到有可用的项。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有可用的项,则会抛出空的异常。否则(块是false),如果立即可用,返回一个项目,否则将抛出空异常(在这种情况下忽略超时)。 282 Queue.get_nowait() #Equivalent to get(False). 283 两种方法来支持跟踪队列的任务是否已经被守护进程的消费者线程完全地处理。 284 Queue.task_done() 285 Queue.join() block直到queue被消费完毕 286 13.生产者消费者模型 287 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。 288 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯, 289 所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列, 290 消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 291 import threading 292 import queue 293 294 295 def producer(): 296 for i in range(10): 297 q.put("骨头 %s" % i) 298 299 print("开始等待所有的骨头被取走...") 300 q.join() 301 print("所有的骨头被取完了...") 302 303 304 def consumer(n): 305 while q.qsize() > 0: 306 print("%s 取到" % n, q.get()) 307 q.task_done() # 告知这个任务执行完了 308 309 310 q = queue.Queue() 311 312 p = threading.Thread(target=producer, ) 313 p.start() 314 315 c1 = consumer("李闯") 316 317 318 import time,random 319 import queue,threading 320 q = queue.Queue() 321 def Producer(name): 322 count = 0 323 while count <20: 324 time.sleep(random.randrange(3)) 325 q.put(count) 326 print(‘Producer %s has produced %s baozi..‘ %(name, count)) 327 count +=1 328 def Consumer(name): 329 count = 0 330 while count <20: 331 time.sleep(random.randrange(4)) 332 if not q.empty(): 333 data = q.get() 334 print(data) 335 print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data)) 336 else: 337 print("-----no baozi anymore----") 338 count +=1 339 p1 = threading.Thread(target=Producer, args=(‘A‘,)) 340 c1 = threading.Thread(target=Consumer, args=(‘B‘,)) 341 p1.start() 342 c1.start() 343 14.多进程模块multiprocessing 344 from multiprocessing import Process 345 import time 346 347 348 def f(name): 349 time.sleep(2) 350 print(‘hello‘, name) 351 352 353 if __name__ == ‘__main__‘: 354 p = Process(target=f, args=(‘bob‘,)) 355 p.start() 356 p.join() 357 14.1展示进程号: 358 from multiprocessing import Process 359 import os 360 361 362 def info(title): 363 print(title) 364 print(‘module name:‘, __name__) 365 print(‘parent process:‘, os.getppid()) 366 print(‘process id:‘, os.getpid()) 367 print("\n\n") 368 369 370 def f(name): 371 info(‘\033[31;1mfunction f\033[0m‘) 372 print(‘hello‘, name) 373 374 375 if __name__ == ‘__main__‘: 376 info(‘\033[32;1mmain process line\033[0m‘) 377 p = Process(target=f, args=(‘bob‘,)) 378 p.start() 379 p.join() 380 14.2进程间通讯 381 14.2.1Queues方法 382 from multiprocessing import Process, Queue 383 384 385 def f(q): 386 q.put([42, None, ‘hello‘]) 387 388 389 if __name__ == ‘__main__‘: 390 q = Queue() 391 p = Process(target=f, args=(q,)) 392 p.start() 393 print(q.get()) # prints "[42, None, ‘hello‘]" 394 p.join() 395 14.2.2Pipes方法 396 from multiprocessing import Process, Pipe 397 398 399 def f(conn): 400 conn.send([42, None, ‘hello‘]) 401 conn.close() 402 403 404 if __name__ == ‘__main__‘: 405 parent_conn, child_conn = Pipe() 406 p = Process(target=f, args=(child_conn,)) 407 p.start() 408 print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" 409 p.join() 410 14.2.3Managers方法 411 from multiprocessing import Process, Manager 412 413 414 def f(d, l): 415 d[1] = ‘1‘ 416 d[‘2‘] = 2 417 d[0.25] = None 418 l.append(1) 419 print(l) 420 421 422 if __name__ == ‘__main__‘: 423 with Manager() as manager: 424 d = manager.dict() 425 426 l = manager.list(range(5)) 427 p_list = [] 428 for i in range(10): 429 p = Process(target=f, args=(d, l)) 430 p.start() 431 p_list.append(p) 432 for res in p_list: 433 res.join() 434 435 print(d) 436 print(l) 437 14.3进程同步 438 from multiprocessing import Process, Lock 439 440 441 def f(l, i): 442 l.acquire() 443 try: 444 print(‘hello world‘, i) 445 finally: 446 l.release() 447 448 449 if __name__ == ‘__main__‘: 450 lock = Lock() 451 452 for num in range(10): 453 Process(target=f, args=(lock, num)).start() 454 15.进程池 455 进程池中有两个方法: 456 apply; 457 apply_async; 458 from multiprocessing import Process, Pool 459 import time 460 def Foo(i): 461 time.sleep(2) 462 return i + 100 463 def Bar(arg): 464 print(‘-->exec done:‘, arg) 465 pool = Pool(5) 466 for i in range(10): 467 pool.apply_async(func=Foo, args=(i,), callback=Bar) 468 # pool.apply(func=Foo, args=(i,)) 469 print(‘end‘) 470 pool.close() 471 pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 472 473 474 16.协程: 475 协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。 476 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此: 477 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。 478 协程的好处: 479 无需线程上下文切换的开销 480 无需原子操作锁定及同步的开销 481 原子操作(atomic operation)是不需要同步的,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。 482 原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。 483 方便切换控制流,简化编程模型 484 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理 485 缺点: 486 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。 487 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序 488 16.1利用yield实现伪协程 489 import time 490 import queue 491 492 493 def consumer(name): 494 print("--->starting eating baozi...") 495 while True: 496 new_baozi = yield 497 print("[%s] is eating baozi %s" % (name, new_baozi)) 498 # time.sleep(1) 499 500 501 def producer(): 502 r = con.__next__() 503 r = con2.__next__() 504 n = 0 505 while n < 5: 506 n += 1 507 con.send(n) 508 con2.send(n) 509 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) 510 511 512 if __name__ == ‘__main__‘: 513 con = consumer("c1") 514 con2 = consumer("c2") 515 p = producer() 516 16.2协程的特点 517 必须在只有一个单线程里实现并发 518 修改共享数据不需加锁 519 用户程序里自己保存多个控制流的上下文栈 520 一个协程遇到IO操作自动切换到其它协程 521 16.3Greenlet实现协程(手动) 522 # -*- coding:utf-8 -*- 523 from greenlet import greenlet 524 def test1(): 525 print(12) 526 gr2.switch() 527 print(34) 528 gr2.switch() 529 def test2(): 530 print(56) 531 gr1.switch() 532 print(78) 533 gr1 = greenlet(test1) 534 gr2 = greenlet(test2) 535 gr1.switch() 536 16.4Gevent 协程自动切换 537 import gevent 538 539 540 def func1(): 541 print(‘\033[31;1m李闯在跟海涛搞...\033[0m‘) 542 gevent.sleep(2) 543 print(‘\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m‘) 544 545 546 def func2(): 547 print(‘\033[32;1m李闯切换到了跟海龙搞...\033[0m‘) 548 gevent.sleep(1) 549 print(‘\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m‘) 550 551 552 gevent.joinall([ 553 gevent.spawn(func1), 554 gevent.spawn(func2), 555 # gevent.spawn(func3), 556 ]) 557 16.5 比较同步与异步的性能差别 558 from gevent import monkey; 559 560 monkey.patch_all() 561 import gevent 562 from urllib.request import urlopen 563 564 565 def f(url): 566 print(‘GET: %s‘ % url) 567 resp = urlopen(url) 568 data = resp.read() 569 print(‘%d bytes received from %s.‘ % (len(data), url)) 570 571 572 gevent.joinall([ 573 gevent.spawn(f, ‘https://www.python.org/‘), 574 gevent.spawn(f, ‘https://www.yahoo.com/‘), 575 gevent.spawn(f, ‘https://github.com/‘), 576 ]) 577 17.通过gevent实现单线程下的多socket并发 578 17.1server side : 579 import sys 580 import socket 581 import time 582 import gevent 583 584 from gevent import socket, monkey 585 586 monkey.patch_all() 587 588 589 def server(port): 590 s = socket.socket() 591 s.bind((‘0.0.0.0‘, port)) 592 s.listen(500) 593 while True: 594 cli, addr = s.accept() 595 gevent.spawn(handle_request, cli) 596 597 598 def handle_request(conn): 599 try: 600 while True: 601 data = conn.recv(1024) 602 print("recv:", data) 603 conn.send(data) 604 if not data: 605 conn.shutdown(socket.SHUT_WR) 606 607 except Exception as ex: 608 print(ex) 609 finally: 610 conn.close() 611 612 613 if __name__ == ‘__main__‘: 614 server(8001) 615 17.2client side : 616 import socket 617 618 HOST = ‘localhost‘ # The remote host 619 PORT = 8001 # The same port as used by the server 620 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 621 s.connect((HOST, PORT)) 622 while True: 623 msg = bytes(input(">>:"), encoding="utf8") 624 s.sendall(msg) 625 data = s.recv(1024) 626 # print(data) 627 628 print(‘Received‘, repr(data)) 629 s.close() 630 18.事件驱动与异步IO 631 方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点: 632 1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢? 633 2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘; 634 3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题; 635 所以,该方式是非常不好的。 636 637 方式二:就是事件驱动模型 638 目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下: 639 1. 有一个事件(消息)队列; 640 2. 鼠标按下时,往这个队列中增加一个点击事件(消息); 641 3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等; 642 4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数; 643 644 当我们面对如下的环境时,事件驱动模型通常是一个好的选择: 645 1.程序中有许多任务,而且… 646 2.任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且… 647 3.在等待事件到来时,某些任务会阻塞。 648 4.当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。 649 网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。 650 19.select多并发socket 651 #_*_coding:utf-8_*_ 652 __author__ = ‘Alex Li‘ 653 654 import select 655 import socket 656 import sys 657 import queue 658 659 660 server = socket.socket() 661 server.setblocking(0) 662 663 server_addr = (‘localhost‘,10000) 664 665 print(‘starting up on %s port %s‘ % server_addr) 666 server.bind(server_addr) 667 668 server.listen(5) 669 670 671 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd 672 outputs = [] 673 674 message_queues = {} 675 676 while True: 677 print("waiting for next event...") 678 679 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 680 681 for s in readable: #每个s就是一个socket 682 683 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了, 684 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀 685 #新连接进来了,接受这个连接 686 conn, client_addr = s.accept() 687 print("new connection from",client_addr) 688 conn.setblocking(0) 689 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接 690 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到 691 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 692 693 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 694 695 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了 696 #客户端的数据过来了,在这接收 697 data = s.recv(1024) 698 if data: 699 print("收到来自[%s]的数据:" % s.getpeername()[0], data) 700 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 701 if s not in outputs: 702 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 703 704 705 else:#如果收不到data代表什么呢? 代表客户端断开了呀 706 print("客户端断开了",s) 707 708 if s in outputs: 709 outputs.remove(s) #清理已断开的连接 710 711 inputs.remove(s) #清理已断开的连接 712 713 del message_queues[s] ##清理已断开的连接 714 715 716 for s in writeable: 717 try : 718 next_msg = message_queues[s].get_nowait() 719 720 except queue.Empty: 721 print("client [%s]" %s.getpeername()[0], "queue is empty..") 722 outputs.remove(s) 723 724 else: 725 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 726 s.send(next_msg.upper()) 727 728 729 for s in exeptional: 730 print("handling exception for ",s.getpeername()) 731 inputs.remove(s) 732 if s in outputs: 733 outputs.remove(s) 734 s.close() 735 736 del message_queues[s] 737 738 #_*_coding:utf-8_*_ 739 __author__ = ‘Alex Li‘ 740 741 742 import socket 743 import sys 744 745 messages = [ b‘This is the message. ‘, 746 b‘It will be sent ‘, 747 b‘in parts.‘, 748 ] 749 server_address = (‘localhost‘, 10000) 750 751 # Create a TCP/IP socket 752 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 753 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 754 ] 755 756 # Connect the socket to the port where the server is listening 757 print(‘connecting to %s port %s‘ % server_address) 758 for s in socks: 759 s.connect(server_address) 760 761 for message in messages: 762 763 # Send messages on both sockets 764 for s in socks: 765 print(‘%s: sending "%s"‘ % (s.getsockname(), message) ) 766 s.send(message) 767 768 # Read responses on both sockets 769 for s in socks: 770 data = s.recv(1024) 771 print( ‘%s: received "%s"‘ % (s.getsockname(), data) ) 772 if not data: 773 print(sys.stderr, ‘closing socket‘, s.getsockname() ) 774 775 20.selectors模块 776 import selectors 777 import socket 778 779 sel = selectors.DefaultSelector() 780 781 782 def accept(sock, mask): 783 conn, addr = sock.accept() # Should be ready 784 print(‘accepted‘, conn, ‘from‘, addr) 785 conn.setblocking(False) 786 sel.register(conn, selectors.EVENT_READ, read) 787 788 789 def read(conn, mask): 790 data = conn.recv(1000) # Should be ready 791 if data: 792 print(‘echoing‘, repr(data), ‘to‘, conn) 793 conn.send(data) # Hope it won‘t block 794 else: 795 print(‘closing‘, conn) 796 sel.unregister(conn) 797 conn.close() 798 799 800 sock = socket.socket() 801 sock.bind((‘localhost‘, 10000)) 802 sock.listen(100) 803 sock.setblocking(False) 804 sel.register(sock, selectors.EVENT_READ, accept) 805 806 while True: 807 events = sel.select() 808 for key, mask in events: 809 callback = key.data 810 callback(key.fileobj, mask) 811 812 21.RabbitMQ队列 813 安装 http://www.rabbitmq.com/install-standalone-mac.html 814 安装python rabbitMQ module 815 pip install pika 816 or 817 easy_install pika 818 or 819 源码 820 https://pypi.python.org/pypi/pika 821 822 21.1send端 823 # !/usr/bin/env python 824 import pika 825 826 connection = pika.BlockingConnection(pika.ConnectionParameters( 827 ‘localhost‘)) 828 channel = connection.channel() 829 830 # 声明queue 831 channel.queue_declare(queue=‘hello‘) 832 833 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 834 channel.basic_publish(exchange=‘‘, 835 routing_key=‘hello‘, 836 body=‘Hello World!‘) 837 print(" [x] Sent ‘Hello World!‘") 838 connection.close() 839 840 21.2receive端 841 # _*_coding:utf-8_*_ 842 __author__ = ‘Alex Li‘ 843 import pika 844 845 connection = pika.BlockingConnection(pika.ConnectionParameters( 846 ‘localhost‘)) 847 channel = connection.channel() 848 849 # You may ask why we declare the queue again ? we have already declared it in our previous code. 850 # We could avoid that if we were sure that the queue already exists. For example if send.py program 851 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 852 # practice to repeat declaring the queue in both programs. 853 channel.queue_declare(queue=‘hello‘) 854 855 856 def callback(ch, method, properties, body): 857 print(" [x] Received %r" % body) 858 859 860 channel.basic_consume(callback, 861 queue=‘hello‘, 862 no_ack=True) 863 864 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 865 channel.start_consuming() 866 867 21.3 Work Queues 868 21.3.1消息提供者代码 869 import pika 870 import time 871 872 connection = pika.BlockingConnection(pika.ConnectionParameters( 873 ‘localhost‘)) 874 channel = connection.channel() 875 876 # 声明queue 877 channel.queue_declare(queue=‘task_queue‘) 878 879 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 880 import sys 881 882 message = ‘ ‘.join(sys.argv[1:]) or "Hello World! %s" % time.time() 883 channel.basic_publish(exchange=‘‘, 884 routing_key=‘task_queue‘, 885 body=message, 886 properties=pika.BasicProperties( 887 delivery_mode=2, # make message persistent 888 ) 889 ) 890 print(" [x] Sent %r" % message) 891 connection.close() 892 21.3.2消费者代码 893 # _*_coding:utf-8_*_ 894 895 import pika, time 896 897 connection = pika.BlockingConnection(pika.ConnectionParameters( 898 ‘localhost‘)) 899 channel = connection.channel() 900 901 902 def callback(ch, method, properties, body): 903 print(" [x] Received %r" % body) 904 time.sleep(20) 905 print(" [x] Done") 906 print("method.delivery_tag", method.delivery_tag) 907 ch.basic_ack(delivery_tag=method.delivery_tag) 908 909 910 channel.basic_consume(callback, 911 queue=‘task_queue‘, 912 no_ack=True 913 ) 914 915 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 916 channel.start_consuming() 917 21.3.3消息持久化 918 First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable: 919 --channel.queue_declare(queue=‘hello‘, durable=True) 920 Although this command is correct by itself, it won‘t work in our setup. That‘s because we‘ve already defined a queue called hello which is not durable. RabbitMQ doesn‘t allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let‘s declare a queue with different name, for exampletask_queue: 921 --channel.queue_declare(queue=‘task_queue‘, durable=True) 922 This queue_declare change needs to be applied to both the producer and consumer code. 923 At that point we‘re sure that the task_queue queue won‘t be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2. 924 --channel.basic_publish(exchange=‘‘, 925 routing_key="task_queue", 926 body=message, 927 properties=pika.BasicProperties( 928 delivery_mode=2, # make message persistent 929 )) 930 21.3.4消息公平分发 931 channel.basic_qos(prefetch_count=1) 932 933 21.3.5带消息持久化+公平分发的完整代码 934 生产者端 935 # !/usr/bin/env python 936 import pika 937 import sys 938 939 connection = pika.BlockingConnection(pika.ConnectionParameters( 940 host=‘localhost‘)) 941 channel = connection.channel() 942 943 channel.queue_declare(queue=‘task_queue‘, durable=True) 944 945 message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" 946 channel.basic_publish(exchange=‘‘, 947 routing_key=‘task_queue‘, 948 body=message, 949 properties=pika.BasicProperties( 950 delivery_mode=2, # make message persistent 951 )) 952 print(" [x] Sent %r" % message) 953 connection.close() 954 消费者端 955 # !/usr/bin/env python 956 import pika 957 import time 958 959 connection = pika.BlockingConnection(pika.ConnectionParameters( 960 host=‘localhost‘)) 961 channel = connection.channel() 962 963 channel.queue_declare(queue=‘task_queue‘, durable=True) 964 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 965 966 967 def callback(ch, method, properties, body): 968 print(" [x] Received %r" % body) 969 time.sleep(body.count(b‘.‘)) 970 print(" [x] Done") 971 ch.basic_ack(delivery_tag=method.delivery_tag) 972 973 974 channel.basic_qos(prefetch_count=1) 975 channel.basic_consume(callback, 976 queue=‘task_queue‘) 977 978 channel.start_consuming() 979 21.3.5Publish\Subscribe(消息发布\订阅) 980 之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了, 981 982 An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type. 983 984 Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息 985 986 987 fanout: 所有bind到此exchange的queue都可以接收消息 988 direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 989 topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 990 991 表达式符号说明:#代表一个或多个字符,*代表任何字符 992 例:#.a会匹配a.a,aa.a,aaa.a等 993 *.a会匹配a.a,b.a,c.a等 994 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 995 996 headers: 通过headers 来决定把消息发给哪些queue 997 998 21.3.6消息publisher 999 1000 import pika 1001 import sys 1002 1003 connection = pika.BlockingConnection(pika.ConnectionParameters( 1004 host=‘localhost‘)) 1005 channel = connection.channel() 1006 1007 channel.exchange_declare(exchange=‘logs‘, 1008 type=‘fanout‘) 1009 1010 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 1011 channel.basic_publish(exchange=‘logs‘, 1012 routing_key=‘‘, 1013 body=message) 1014 print(" [x] Sent %r" % message) 1015 connection.close() 1016 1017 21.3.7消息subscriber 1018 # _*_coding:utf-8_*_ 1019 __author__ = ‘Alex Li‘ 1020 import pika 1021 1022 connection = pika.BlockingConnection(pika.ConnectionParameters( 1023 host=‘localhost‘)) 1024 channel = connection.channel() 1025 1026 channel.exchange_declare(exchange=‘logs‘, 1027 type=‘fanout‘) 1028 1029 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 1030 queue_name = result.method.queue 1031 1032 channel.queue_bind(exchange=‘logs‘, 1033 queue=queue_name) 1034 1035 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 1036 1037 1038 def callback(ch, method, properties, body): 1039 print(" [x] %r" % body) 1040 1041 1042 channel.basic_consume(callback, 1043 queue=queue_name, 1044 no_ack=True) 1045 1046 channel.start_consuming() 1047 21.3.8有选择的接收消息(exchange type=direct) 1048 RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。 1049 publisher: 1050 import pika 1051 import sys 1052 1053 connection = pika.BlockingConnection(pika.ConnectionParameters( 1054 host=‘localhost‘)) 1055 channel = connection.channel() 1056 1057 channel.exchange_declare(exchange=‘direct_logs‘, 1058 type=‘direct‘) 1059 1060 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ 1061 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 1062 channel.basic_publish(exchange=‘direct_logs‘, 1063 routing_key=severity, 1064 body=message) 1065 print(" [x] Sent %r:%r" % (severity, message)) 1066 connection.close() 1067 1068 subscriber : 1069 import pika 1070 import sys 1071 1072 connection = pika.BlockingConnection(pika.ConnectionParameters( 1073 host=‘localhost‘)) 1074 channel = connection.channel() 1075 1076 channel.exchange_declare(exchange=‘direct_logs‘, 1077 type=‘direct‘) 1078 1079 result = channel.queue_declare(exclusive=True) 1080 queue_name = result.method.queue 1081 1082 severities = sys.argv[1:] 1083 if not severities: 1084 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 1085 sys.exit(1) 1086 1087 for severity in severities: 1088 channel.queue_bind(exchange=‘direct_logs‘, 1089 queue=queue_name, 1090 routing_key=severity) 1091 1092 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 1093 1094 1095 def callback(ch, method, properties, body): 1096 print(" [x] %r:%r" % (method.routing_key, body)) 1097 1098 1099 channel.basic_consume(callback, 1100 queue=queue_name, 1101 no_ack=True) 1102 1103 21.3.9 更细致的消息过滤 1104 Although using the direct exchange improved our system, it still has limitations - it can‘t do routing based on multiple criteria. 1105 In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...). 1106 That would give us a lot of flexibility - we may want to listen to just critical errors coming from ‘cron‘ but also all logs from ‘kern‘. 1107 1108 publisher: 1109 import pika 1110 import sys 1111 1112 connection = pika.BlockingConnection(pika.ConnectionParameters( 1113 host=‘localhost‘)) 1114 channel = connection.channel() 1115 1116 channel.exchange_declare(exchange=‘topic_logs‘, 1117 type=‘topic‘) 1118 1119 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 1120 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 1121 channel.basic_publish(exchange=‘topic_logs‘, 1122 routing_key=routing_key, 1123 body=message) 1124 print(" [x] Sent %r:%r" % (routing_key, message)) 1125 connection.close() 1126 1127 ------------------------------------------------------------------ 1128 subscriber: 1129 import pika 1130 import sys 1131 1132 connection = pika.BlockingConnection(pika.ConnectionParameters( 1133 host=‘localhost‘)) 1134 channel = connection.channel() 1135 1136 channel.exchange_declare(exchange=‘topic_logs‘, 1137 type=‘topic‘) 1138 1139 result = channel.queue_declare(exclusive=True) 1140 queue_name = result.method.queue 1141 1142 binding_keys = sys.argv[1:] 1143 if not binding_keys: 1144 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 1145 sys.exit(1) 1146 1147 for binding_key in binding_keys: 1148 channel.queue_bind(exchange=‘topic_logs‘, 1149 queue=queue_name, 1150 routing_key=binding_key) 1151 1152 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 1153 1154 1155 def callback(ch, method, properties, body): 1156 print(" [x] %r:%r" % (method.routing_key, body)) 1157 1158 1159 channel.basic_consume(callback, 1160 queue=queue_name, 1161 no_ack=True) 1162 1163 channel.start_consuming() 1164 1165 --------------------------------------------------------------- 1166 To receive all the logs run: 1167 1168 python receive_logs_topic.py "#" 1169 To receive all logs from the facility "kern": 1170 1171 python receive_logs_topic.py "kern.*" 1172 Or if you want to hear only about "critical" logs: 1173 1174 python receive_logs_topic.py "*.critical" 1175 You can create multiple bindings: 1176 1177 python receive_logs_topic.py "kern.*" "*.critical" 1178 And to emit a log with a routing key "kern.critical" type: 1179 1180 python emit_log_topic.py "kern.critical" "A critical kernel error"
时间: 2024-10-03 10:48:37