Author: 楚格
2018-11-17 17:34:58
IDE: Pycharm2018.02 Python 3.7
KeyWord : 线程 threading Thread
Explain:
-----------------------------------------------------
--
1 # coding=utf-8 2 #--------------------------------- 3 ‘‘‘ 4 # Author : chu ge 5 # Function: 线程 thread 6 # 7 ‘‘‘ 8 #--------------------------------- 9 ‘‘‘ 10 # -------------------------------- 11 # 导入模块 12 # 1.系统库 13 # 2.第三方库 14 # 3.相关定义库 15 # -------------------------------- 16 ‘‘‘ 17 # 1.系统库 18 import sys 19 import os 20 import time 21 import random 22 23 #2.第三方库 24 25 #进程导入模块 26 # from multiprocessing import Process 27 from multiprocessing import Pool 28 # from multiprocessing import Queue 29 # from multiprocessing import Manager 30 31 #进程导入模块 32 import threading 33 from threading import Thread 34 from threading import Lock 35 from queue import Queue 36 37 38 39 40 41 42 # 43 ‘‘‘ 44 ============================================================================ 45 #》》》》》》》》》》 46 47 线程 48 49 ----------------------- 50 python的thread模块比较底层, 51 threading模块做了些包装,可以更加方便的被使用 52 53 进程 VS 线程 54 功能的不同 55 进程,能够完成多任务,如一台电脑上可以同时运行多个微信 56 线程,能够完成多任务,如一个微信界面中可以运行多个窗口 57 定义的不同 58 进程,是系统进行资源分配和调度的一个独立单位。 59 线程,是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的 60 能独立运行的基本单位。 61 线程自己基本不拥有系统资源,只拥有一点在运行中必不可少的资源, 62 如:程序计数器,一组寄存器和栈,但是它可与同属一个进程的 63 其他线程共享进程所拥有的全部的资源。 64 65 区别 66 一个程序至少有一个进程,一个进程至少有一个线程。 67 线程的划分尺度小于进程,资源比进程少,使得多线程程序的并发性高。 68 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大提高程序的运行效率 69 线程不能够独立执行,必须依存在进程中 70 优缺点 71 线程和进程在使用上各有优缺点:线程执行开销小但不利于资源管理和保护 72 而进程正相反。 73 74 ---------------------------------------------- 75 1. 线程 使用threading 模块 76 77 ----------------------- 78 1.1 单 / 多线程执行 79 e.g: 80 #1如果多个线程执行同一函数,各是各的,互不影响 81 def Function_Say_Sorry(): 82 print("A say: Sorry !") 83 time.sleep(1) 84 if __name__ == "__main__": 85 # 单线程 86 for var in range(10): 87 Function_Say_Sorry() 88 89 # 多线程执行 90 for var in range(10): 91 # 创建线程 92 thread_name = threading.Thread(target = Function_Say_Sorry) 93 thread_name.start() 94 result: 95 1)可以看出使用了多线程并发操作,花费时间要短很多 96 2)创建好的线程,需要调用start()方法来启动 97 98 ----------------------- 99 e.g: 100 101 if __name__ == "__main__": 102 103 result: 104 105 ----------------------- 106 1.2 主线程会等待所有的子线程结束后才结束 107 e.g: 108 def Function_Sing(): 109 for var in range(10): 110 print("正在唱歌 %d" % var) 111 time.sleep(1) 112 113 def Function_Dance(): 114 for var in range(5): 115 print("正在跳舞 %d" % var) 116 time.sleep(1) 117 118 if __name__ == "__main__": 119 # 主线程等待子线程结束而结束 120 print("--- start ---:\n %s" % (time.time())) 121 122 time_A = threading.Thread(target = Function_Sing) 123 time_B = threading.Thread(target = Function_Dance) 124 time_A.start() 125 time_B.start() 126 time.sleep(5) # 127 print("--- finish ---:\n%s" % (time.time())) 128 # 查看线程数量 129 while True: 130 length = len(threading.enumerate()) 131 print("当前运行的线程数为: [%d] " % (length)) 132 if length <= 1 : 133 break 134 time.sleep(0.5) 135 result: 136 137 ----------------------- 138 1.3 threading 139 1.3.1 线程执行代码的封装 140 通过使用threading模块能完成多任务的程序开发,为了每个线程更完美 141 所有使用threading模块时,往往定义新的子类class, 142 只要继承threading.Thread就可以了,然后重写run方法 143 144 e.g: 145 #线程执行代码的封装 146 class Class_MyThread(threading.Thread): 147 def run(self): 148 for var in range(10): 149 time.sleep(1) 150 message = "I am " + self.name + " @ " + str(var) 151 print("message: %s" % (message)) 152 153 154 if __name__ == "__main__": 155 thread_name = Class_MyThread() 156 thread_name.start() 157 158 result: 159 python的threading.Thread类有一个run方法, 160 用于定义线程的功能函数,可以在自己的线程类中覆盖还方法。 161 而创建自己线程实例后,通过thread类start方法,可以启动该线程, 162 交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。 163 164 ----------------------- 165 1.3.2 线程的执行顺序 166 e.g: 167 # 线程的执行顺序 168 #重复封装类 169 def Function_Test_A(): 170 for var in range(10): 171 thread_name_local = Class_MyThread() 172 thread_name_local.start() 173 if __name__ == "__main__": 174 Function_Test_A() 175 176 result: 177 从执行结果可以看出,多线程程序的执行顺序是不确定的 178 当执行到sleep语句时,线程将会被阻塞(Blocked),到sleep结束后, 179 线程进入就绪(Runnable)状态,等待调度,而线程调度就讲自行选择一个线程执行。 180 上面代码可以保证运行完成的run,但线程启动顺序和run函数中每次循环的执行顺序都不确定。 181 182 总结 183 1) 每个线程一定会有一个name,尽管示例中没有指定线程对象的name, 184 但是python会自动为线程指定一个名字。 185 2)当线程的run函数结束时,该线程完成 186 3)无法控制线程调度程序,但是可以通过别的方式来影响调度的方式 187 4)线程的几种状态 188 189 启动 调度 结束 190 新建 -----> 就绪 <-----> 运行 -----> 死亡 191 \ / 192 满足条件 等待条件 193 \ / 194 等待(阻塞) 195 196 197 ----------------------- 198 1.4.1 多线程-共享全局变量 199 200 e.g: 201 def Function_Work_A(): 202 global global_var_number_A 203 204 for var in range(10): 205 global_var_number_A += 1 206 print("work A ,number is %d" % (global_var_number_A)) 207 208 209 def Function_Work_B(): 210 global global_var_number_A 211 212 print("work B ,number is %d" % (global_var_number_A)) 213 214 if __name__ == "__main__": 215 global_var_number_A = 100 216 217 print("创建线程之前,number is %d" %(global_var_number_A)) 218 thread_name_A = threading.Thread(target = Function_Work_A ) 219 thread_name_A.start() 220 time.sleep(1) 221 222 thread_name_B = threading.Thread(target = Function_Work_B ) 223 thread_name_B.start() 224 225 result: 226 创建线程之前,number is 100 227 work A ,number is 110 228 work B ,number is 110 229 230 ----------------------- 231 1.4.2 列表当做实参传递到线程中 232 e.g: 233 # 列表当做实参传递到线程中 234 def Function_Work_C(nums): 235 local_var_number = nums 236 237 local_var_number.append("CC") 238 print("work C ,number is %s" % (local_var_number)) 239 240 241 def Function_Work_D(nums): 242 local_var_number = nums 243 time.sleep(1) 244 print("work D ,number is %s" % (local_var_number)) 245 246 if __name__ == "__main__": 247 global_nums = [11,22,33,44,55] 248 thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,)) 249 thread_name_C.start() 250 thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,)) 251 thread_name_D.start() 252 253 result: 254 work C ,number is [11, 22, 33, 44, 55, ‘CC‘] 255 work D ,number is [11, 22, 33, 44, 55, ‘CC‘] 256 总结: 257 在一线程内的所有线程共享全局变量,能够在不适用其他方式的前提下, 258 完成多线程之间的数据共享,这点要比多进程要好。 259 缺点就是,线程是对全局变量随意遂改可能造成多线程之间, 260 对全局变量的混乱,即线程非安全。 261 262 ----------------------- 263 1.5 线程不安全 同步 264 265 e.g: 266 # 同步 267 def Function_Test_B(): 268 global global_var_number 269 for var in range(1000000): 270 global_var_number += 1 271 print("Test B ,number is %d" % (global_var_number)) 272 273 def Function_Test_C(): 274 global global_var_number 275 for var in range(1000000): 276 global_var_number += 1 277 print("Test C ,number is %d" % (global_var_number)) 278 279 if __name__ == "__main__": 280 global_var_number = 0 281 thread_name_E = threading.Thread(target = Function_Test_B) 282 thread_name_E.start() 283 time.sleep(3) 284 thread_name_F = threading.Thread(target = Function_Test_C) 285 thread_name_F.start() 286 287 print("number: %s" % (global_var_number)) 288 289 result: 290 Test B ,number is 1000000 291 number: 1059644 292 Test C ,number is 2000000 293 294 同步:就是协同步调,按照预定的先后次序进行运行。 295 同指协同,协助,相互配合 296 如,进程/线程同步,可以理解为进程/线程A和B一块配合,A执行到一定程度时 297 要依靠B的某个结果,于是停下来,示意B运行,在将结果给A,A继续执行操作。 298 299 解决问题:线程同步 300 1.系统调用thread_name_E,然后获取到num的值为0, 301 此时上一把锁,即不允许其他现在操作num 302 2.对num的值进行+1 303 3.解锁,此时num的值为1,其他的线程就可以使用num了, 304 而且num的值是0而不是1 305 4.同理,其他线程在对num进行修改时,都要先上锁,处理完成后再解锁。 306 在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性。 307 308 ----------------------- 309 1.6 互斥锁 310 311 当多个线程几乎同时修改某个共享数据的时候,需要进行同步控制。 312 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。 313 314 互斥锁为资源引入一个状态: 锁定/非锁定 315 316 某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定, 317 其他线程不能更改,直到该线程释放资源,将资源的状态变成非锁定, 318 其他的线程才能再次锁定该资源。 319 互斥锁保证每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。 320 321 322 threading模块定义Lock类,方便处理锁定: 323 #创建锁 324 mutex = threading.Lock() 325 #锁定 326 mutex.acquire([blocking]) 327 #释放 328 mutex.release() 329 330 在锁定方法中,acquire可以有个blocking参数 331 如果设定blocking为True,则当前线程会阻塞,直到获得这个锁为止,如果没有指定,那么默认为True 332 如果设定blocking为Fasle,则当前线程不会阻塞。 333 334 335 上锁解锁过程: 336 当一个线程调用锁的acpuire()方法获得解锁时,锁就进入Locked状态。 337 每次只有一个线程可以获得锁。如果此时另外一个线程试图获得这个锁, 338 该线程就会变为blocked状态,称为阻塞,直到拥有锁的线程调用锁release()方法释放后,锁进入unlocked状态 339 线程调度程序从处于同步阻塞状态的线程中选择一个获得锁,并使得该线程进入运行(running) 340 341 总结: 342 锁的好处: 343 确保了某段关键代码只能由一个线程从头到尾完整的执行 344 锁的坏处: 345 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率降低了。 346 由于可以存在多个锁,不同的线程持有不同的锁,并试图获得对方持有的锁,可能造成死锁。 347 348 349 e.g: 350 # 互斥锁 351 def Function_Test_D(): 352 global global_var_number 353 # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功, 354 # 导致另一方会堵塞(一直等待)到这个锁被解锁为止 355 mutexFlag = mutex.acquire(True) 356 for var in range(1000000): 357 # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在 358 #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续 359 if mutexFlag: 360 global_var_number += 1 361 # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有 362 # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁 363 mutex.release() 364 365 print("Test D ,number is %d" % (global_var_number)) 366 367 def Function_Test_E(): 368 global global_var_number 369 370 for var in range(1000000): 371 mutexFlag = mutex.acquire(True) 372 if mutexFlag: 373 global_var_number += 1 374 mutex.release() 375 376 print("Test E ,number is %d" % (global_var_number)) 377 378 def Function_Test_F(): 379 global global_var_number 380 mutexFlag = mutex.acquire(True) 381 for var in range(1000000): 382 if mutexFlag: 383 global_var_number += 1 384 mutex.release() 385 386 print("Test F ,number is %d" % (global_var_number)) 387 388 if __name__ == "__main__": 389 global_var_number = 0 390 # 创建锁 391 mutex = threading.Lock() 392 393 thread_name_D = threading.Thread(target=Function_Test_D) 394 thread_name_D.start() 395 396 thread_name_E = threading.Thread(target = Function_Test_E) 397 thread_name_E.start() 398 399 thread_name_F = threading.Thread(target = Function_Test_F) 400 thread_name_F.start() 401 402 print("number: %s" % (global_var_number)) 403 404 result: 405 406 ----------------------- 407 1.7 多线程-非共享数据 408 409 对于全局变量,在多线程中要格外小心,否则容易造成数据错乱的情况发生 410 在多线程开发中,全局变量是多个线程都是共享的数据,而局部变量等是各自线程,是非共享的。 411 412 e.g: 413 class Class_My_Thread(threading.Thread): 414 # 重写 构造方法 415 def __init__(self,number,SleepTime): 416 threading.Thread.__init__(self) 417 self.num = number 418 self.SleepTime = SleepTime 419 420 def run(self): 421 self.num += 1 # 局部变量 422 time.sleep(self.SleepTime) 423 print("线程 (%s), number = (%s)" % (self.name,self.num)) 424 425 if __name__ == "__main__": 426 # 创建锁 427 mutex = threading.Lock() 428 thread_name_G = Class_My_Thread(100, 3) 429 thread_name_G.start() 430 431 thread_name_H = Class_My_Thread(200, 1) 432 thread_name_H.start() 433 434 thread_name_I = Class_My_Thread(300, 2) 435 thread_name_I.start() 436 437 result: 438 线程 (Thread-2), number = (201) 439 线程 (Thread-3), number = (301) 440 线程 (Thread-1), number = (101) 441 局部变量是不共享数据的。 442 443 ----------------------- 444 1.8 死锁 445 446 在线程共享多个资源的时候,如果二个线程分别占有一部分资源 447 并且同时等待对方的资源,就会造成死锁。 448 尽管死锁很少发生,但一旦发生就会造成应用的停止响应。 449 450 e.g: 451 # 死锁 452 class Class_My_Thread_A(threading.Thread): 453 def run(self): 454 if mutexA.acquire(): 455 print(self.name + "--do 1 up --") 456 time.sleep(1) 457 458 if mutexB.acquire(): 459 print(self.name + "--do 1 down --") 460 mutexB.release() 461 mutexA.release() 462 463 464 class Class_My_Thread_B(threading.Thread): 465 def run(self): 466 if mutexB.acquire(): 467 print(self.name + "--do 2 up --") 468 time.sleep(1) 469 470 if mutexA.acquire(): 471 print(self.name + "--do 2 down --") 472 mutexA.release() 473 mutexB.release() 474 475 if __name__ == "__main__": 476 # 死锁 不能执行的效果 477 mutexA = threading.Lock() 478 mutexB = threading.Lock() 479 thread_name_J = Class_My_Thread_A() 480 thread_name_J.start() 481 thread_name_K = Class_My_Thread_B() 482 thread_name_K.start() 483 484 print("...") 485 486 result: 487 #避免死锁 488 # 程序设计时,要尽量避免,银行家算法 489 # 添加超时时间等 acquire(timeout)很重要!!! 490 491 492 ----------------------- 493 1.9.1 同步应用 494 多线程有序执行 495 496 可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步 497 498 e.g: 499 # 多线程有序执行 500 class Class_Task_C(threading.Thread): 501 def run(self): 502 while True: 503 if mutexC.acquire(): 504 print("--- Task C ---[%s]" % (self.name)) 505 time.sleep(1) 506 mutexD.release() 507 508 class Class_Task_D(threading.Thread): 509 def run(self): 510 while True: 511 if mutexD.acquire(): 512 print("--- Task D ---[%s]" % (self.name)) 513 time.sleep(1) 514 mutexE.release() 515 516 class Class_Task_E(threading.Thread): 517 def run(self): 518 while True: 519 if mutexE.acquire(): 520 print("--- Task F ---[%s]" % (self.name)) 521 time.sleep(1) 522 mutexC.release() 523 524 if __name__ == "__main__": 525 # 同步应用 526 # 创建锁 527 mutexC = threading.Lock() 528 529 mutexD = threading.Lock() 530 mutexD.acquire() 531 mutexE = threading.Lock() 532 mutexE.acquire() 533 534 thread_name_L = Class_Task_C() 535 thread_name_L.start() 536 537 thread_name_M = Class_Task_D() 538 thread_name_M.start() 539 540 thread_name_N = Class_Task_E() 541 thread_name_N.start() 542 result: 543 --- Task C ---[Thread-1] 544 --- Task D ---[Thread-2] 545 --- Task F ---[Thread-3] 546 --- Task C ---[Thread-1] 547 --- Task D ---[Thread-2] 548 --- Task F ---[Thread-3] 549 --- Task C ---[Thread-1] 550 --- Task D ---[Thread-2] 551 552 ----------------------- 553 2.0 Queue 队列 554 555 Queue使用说明 556 1) 对于Queue,在多线程通信之间扮演重要的角色 557 2) 添加数据队列中,使用put()方法 558 3) 从队列中获取数据,使用get()方法 559 4) 判断队列中是否还有数据,使用qsize()方法 560 561 生产者消费者模式说明: 562 在线程世界里。生成者就是生产数据的线程,消费者就是消费数据的线程。 563 在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢, 564 那么生成者就必须等待消费者处理完,才能继续生产数据。同样道理 565 如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。 566 为了解决这个问题引入了生产者和消费者模式。 567 568 生产者消费者模式,是通过一个容器来解决生产者和消费者的强耦合问题。 569 生产者和消费者之间彼此之间不直接通讯,而通过阻塞队列来进行通讯, 570 所有生产者生产数据之后不用等待消费者处理,直接扔给阻塞队列, 571 消费者不找生产者要数据,而是直接从阻塞队列里取出数据, 572 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 573 这个阻塞队列就是用来给生产者和消费者解耦的。 574 纵观大多数设计模式,都会找到一个第三者出来进行解耦。 575 576 577 e.g: 578 class Class_Producer_F(threading.Thread): 579 def run(self): 580 # global queue 581 var_count = 0 582 583 while True: 584 if queue_name.qsize() < 1000 : 585 # 每次就生产100个商品,二个线程做个事 586 for var in range(100): 587 var_count += 1 588 message = "生成产品" + str(var_count) 589 queue_name.put(message) 590 # print("-- Producer F --[%s]" % (self.name)) 591 time.sleep(0.5) 592 593 class Class_Consumer_G(threading.Thread): 594 def run(self): 595 # global queue 596 597 while True: 598 if queue_name.qsize() > 100: 599 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事 600 for var in range(3): 601 message = self.name + "消费了" + queue_name.get() 602 print("message: (%s)",message) 603 # print("-- Consumer G --[%s]" % (self.name)) 604 time.sleep(1) 605 606 if __name__ == "__main__": 607 # #----------------------- 608 # Queue 提供了同步、线程安全的队列类 609 # 队列原则: 先进先出 FIFO 610 # 栈的原则: 先进后出 LIFO 611 # 这些队列都实现了锁原语,原子操作 612 #创建队列 ,这个队列只能用于线程,而进程中不能使用 613 queue_name = Queue() # 队列 614 # 队列初创 615 for var in range(500): 616 queue_name.put("》》初始产品"+ str(var)) 617 # 创建二次线程 producer 618 for var in range(2): 619 producer_name = Class_Producer_F() 620 producer_name.start() 621 # 创建二次线程 consumer 622 for var in range(5): 623 producer_name = Class_Consumer_G() 624 producer_name.start() 625 # 2 + 5 +1(主线程) = 8 (总线程) 626 result: 627 628 ----------------------- 629 2.1 ThreadLocal 630 631 ThreadLocal不用查找dict,ThreadLocal帮你自动的做这个事: 632 全局变量Local_school就是一个ThreadLocal对象, 633 每个Thread对他都可以读写student属性,但互不影响。 634 你可以把ThreadLocal看出全局变量,但每个属性, 635 如,Local_school.student都是线程的局部变量, 636 可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。 637 638 可以理解为全局变量local_school是dict,不但可以用Local_school.student, 639 还可以绑定其他变量,如Local_school.teacher等。 640 641 ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接, 642 HTTP请求,用户身份信息,这样一个线程的所有调用到的处理函数 643 都可以非常方便的访问这些资源。 644 645 总结: 646 ThreadLocal变量虽然是全局变量,但是每个线程都只能读写自己线程的独立副本, 647 互不干扰,ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。 648 649 650 e.g: 651 # ThreadLocal 652 def Function_Process_student(): 653 # 获取当前线程关联的student 654 student_name = local_school.student 655 print("%s: (in %s)"% (student_name,threading.current_thread())) 656 657 def Function_Process_thread(name): 658 local_var_name =name 659 660 # 绑定ThreadLocal的student 661 local_school.student = local_var_name 662 Function_Process_student() 663 664 if __name__ == "__main__": 665 # ThreadLocal 666 # 创建全局ThreadLocal对象 667 local_school = threading.local() 668 669 thread_name_O = threading.Thread(target=Function_Process_thread, args=(‘图形分析‘,), name=‘Thread-A‘) 670 thread_name_O.start() 671 thread_name_O.join() 672 673 thread_name_P = threading.Thread(target = Function_Process_thread, args=(‘数据处理‘,), name=‘Thread-B‘) 674 thread_name_P.start() 675 thread_name_P.join() 676 677 thread_name_Q = threading.Thread(target = Function_Process_thread, args=(‘信号处理‘,), name=‘Thread-C‘) 678 thread_name_Q.start() 679 thread_name_Q.join() 680 result: 681 682 ----------------------- 683 2.2 异步 684 685 e.g: 686 def Function_Test_G(): 687 print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid())) 688 for var in range(3): 689 print("-- [%s]" % (var)) 690 time.sleep(1) 691 return "Function_Test_G" 692 693 def Function_Test_H(args): 694 print("callback func PID =[%s]" % (os.getpid())) 695 print("callback func args=[%s]" % (args)) 696 697 if __name__ == "__main__": 698 # 异步 699 #创建进程池 700 pool_name = Pool(3) 701 # callback 回调函数 子线程结束后,立刻执行回调函数 702 pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H ) 703 704 # 异步的理解:主进程正在做某件事情, 705 # 突然来了一件更需要立刻去做的事情, 706 # 那么这种,在父进程去做某件事情的时候 707 # 并不知道是什么时候去做,的模式 就称为异步 708 while True: 709 time.sleep(1) 710 print("主进程 PID = [%s]" % (os.getpid())) 711 result: 712 进程池中的进程PID=[69792],PPID=[71624] 713 -- [0] 714 主进程 PID = [71624] 715 -- [1] 716 主进程 PID = [71624] 717 -- [2] 718 主进程 PID = [71624] 719 callback func PID =[71624] 720 callback func args=[Function_Test_G] 721 主进程 PID = [71624] 722 主进程 PID = [71624] 723 主进程 PID = [71624] 724 725 ----------------------- 726 727 ---------------------------------------------- 728 729 ============================================================================ 730 ‘‘‘ 731 732 # 733 ‘‘‘ 734 # ============================================================================ 735 # Function: 736 # Explain : 输入参数 737 # : 输出参数 738 # ============================================================================ 739 ‘‘‘ 740 # #----------------------- 741 # 单 / 多 线程执行 742 743 def Function_Say_Sorry(): 744 print("A say: Sorry !") 745 time.sleep(1) 746 747 # #----------------------- 748 def Function_Sing(): 749 for var in range(10): 750 print("正在唱歌 %d" % var) 751 time.sleep(1) 752 753 def Function_Dance(): 754 for var in range(5): 755 print("正在跳舞 %d" % var) 756 time.sleep(1) 757 758 # #----------------------- 759 #线程执行代码的封装 760 class Class_MyThread(threading.Thread): 761 def run(self): 762 for var in range(10): 763 time.sleep(1) 764 message = "I am " + self.name + " @ " + str(var) 765 print("message: %s" % (message)) 766 767 # #----------------------- 768 # 线程的执行顺序 769 #重复封装类 770 def Function_Test_A(): 771 for var in range(10): 772 thread_name_local = Class_MyThread() 773 thread_name_local.start() 774 775 # #----------------------- 776 # 多线程-共享全局变量 777 778 def Function_Work_A(): 779 global global_var_number_A 780 781 for var in range(10): 782 global_var_number_A += 1 783 print("work A ,number is %d" % (global_var_number_A)) 784 785 786 def Function_Work_B(): 787 global global_var_number_A 788 789 print("work A ,number is %d" % (global_var_number_A)) 790 791 # #----------------------- 792 # 列表当做实参传递到线程中 793 def Function_Work_C(nums): 794 local_var_number = nums 795 796 local_var_number.append("CC") 797 print("work C ,number is %s" % (local_var_number)) 798 799 800 def Function_Work_D(nums): 801 local_var_number = nums 802 time.sleep(1) 803 print("work D ,number is %s" % (local_var_number)) 804 805 806 # #----------------------- 807 # 同步 808 def Function_Test_B(): 809 global global_var_number 810 for var in range(1000000): 811 global_var_number += 1 812 print("Test B ,number is %d" % (global_var_number)) 813 814 def Function_Test_C(): 815 global global_var_number 816 for var in range(1000000): 817 global_var_number += 1 818 print("Test C ,number is %d" % (global_var_number)) 819 820 # #----------------------- 821 # 互斥锁 822 def Function_Test_D(): 823 global global_var_number 824 # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功, 825 # 导致另一方会堵塞(一直等待)到这个锁被解锁为止 826 mutexFlag = mutex.acquire(True) 827 for var in range(1000000): 828 # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在 829 #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续 830 if mutexFlag: 831 global_var_number += 1 832 # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有 833 # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁 834 mutex.release() 835 836 print("Test D ,number is %d" % (global_var_number)) 837 838 def Function_Test_E(): 839 global global_var_number 840 841 for var in range(1000000): 842 mutexFlag = mutex.acquire(True) 843 if mutexFlag: 844 global_var_number += 1 845 mutex.release() 846 847 print("Test E ,number is %d" % (global_var_number)) 848 849 def Function_Test_F(): 850 global global_var_number 851 mutexFlag = mutex.acquire(True) 852 for var in range(1000000): 853 if mutexFlag: 854 global_var_number += 1 855 mutex.release() 856 857 print("Test F ,number is %d" % (global_var_number)) 858 859 # #----------------------- 860 # 多线程-非共享数据 861 class Class_My_Thread(threading.Thread): 862 # 重写 构造方法 863 # 1. 全局变量在多个线程中 共享,为了保证正确运行需要锁 864 # 2. 非全局变量在每个线程中 各有一份,不会共享,当然了不需要加锁 865 def __init__(self,number,SleepTime): 866 threading.Thread.__init__(self) 867 self.num = number 868 self.SleepTime = SleepTime 869 870 def run(self): 871 self.num += 1 # 局部变量 872 time.sleep(self.SleepTime) 873 print("线程 (%s), number = (%s)" % (self.name,self.num)) 874 875 # #----------------------- 876 # 死锁 877 class Class_My_Thread_A(threading.Thread): 878 879 def run(self): 880 if mutexA.acquire(): 881 print(self.name + "--do 1 up --") 882 time.sleep(1) 883 884 if mutexB.acquire(True,3): 885 print(self.name + "--do 1 down --") 886 mutexB.release() 887 mutexA.release() 888 889 class Class_My_Thread_B(threading.Thread): 890 891 def run(self): 892 if mutexB.acquire(True,4): 893 print(self.name + "--do 2 up --") 894 time.sleep(1) 895 896 if mutexA.acquire(): 897 print(self.name + "--do 2 down --") 898 mutexA.release() 899 mutexB.release() 900 901 # #----------------------- 902 # 多线程有序执行 903 class Class_Task_C(threading.Thread): 904 def run(self): 905 while True: 906 if mutexC.acquire(): 907 print("--- Task C ---[%s]" % (self.name)) 908 time.sleep(1) 909 mutexD.release() 910 911 class Class_Task_D(threading.Thread): 912 def run(self): 913 while True: 914 if mutexD.acquire(): 915 print("--- Task D ---[%s]" % (self.name)) 916 time.sleep(1) 917 mutexE.release() 918 919 class Class_Task_E(threading.Thread): 920 def run(self): 921 while True: 922 if mutexE.acquire(): 923 print("--- Task E ---[%s]" % (self.name)) 924 time.sleep(1) 925 mutexC.release() 926 # #----------------------- 927 # FIFO 队列 生产者消费模式 928 class Class_Producer_F(threading.Thread): 929 def run(self): 930 # global queue 931 var_count = 0 932 933 while True: 934 if queue_name.qsize() < 1000 : 935 # 每次就生产100个商品,二个线程做个事 936 for var in range(100): 937 var_count += 1 938 message = "生成产品" + str(var_count) 939 queue_name.put(message) 940 # print("-- Producer F --[%s]" % (self.name)) 941 time.sleep(0.5) 942 943 class Class_Consumer_G(threading.Thread): 944 def run(self): 945 # global queue 946 947 while True: 948 if queue_name.qsize() > 100: 949 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事 950 for var in range(3): 951 message = self.name + "消费了" + queue_name.get() 952 print("message: (%s)",message) 953 # print("-- Consumer G --[%s]" % (self.name)) 954 time.sleep(1) 955 956 # #----------------------- 957 # ThreadLocal 958 def Function_Process_student(): 959 # 获取当前线程关联的student 960 student_name = local_school.student 961 print("%s: (in %s)"% (student_name,threading.current_thread())) 962 963 def Function_Process_thread(name): 964 local_var_name =name 965 966 # 绑定ThreadLocal的student 967 local_school.student = local_var_name 968 Function_Process_student() 969 970 # #----------------------- 971 # 异步 972 def Function_Test_G(): 973 print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid())) 974 for var in range(3): 975 print("-- [%s]" % (var)) 976 time.sleep(1) 977 return "Function_Test_G" 978 979 def Function_Test_H(args): 980 print("callback func PID =[%s]" % (os.getpid())) 981 print("callback func args=[%s]" % (args)) 982 983 # #----------------------- 984 985 # #----------------------- 986 987 # ============================================================================ 988 ‘‘‘ 989 # ============================================================================ 990 # 测试专用 991 # ============================================================================ 992 ‘‘‘ 993 if __name__ == "__main__": 994 # #----------------------- 995 # # 单线程执行 996 # for var in range(10): 997 # Function_Say_Sorry() 998 999 1000 1001 # # ----------------------- 1002 # # 多线程执行 1003 # for var in range(10): 1004 # # 创建线程 1005 # thread_name = threading.Thread(target = Function_Say_Sorry) 1006 # thread_name.start() 1007 1008 1009 1010 # # # ----------------------- 1011 # # 主线程等待子线程结束而结束 1012 # print("--- start ---:\n %s" % (time.time())) 1013 # 1014 # time_A = threading.Thread(target = Function_Sing) 1015 # time_B = threading.Thread(target = Function_Dance) 1016 # time_A.start() 1017 # time_B.start() 1018 # # time.sleep(5) # 1019 # print("--- finish ---:\n%s" % (time.time())) 1020 # 1021 # # 查看线程数量 1022 # while True: 1023 # length = len(threading.enumerate()) 1024 # print("当前运行的线程数为: [%d] " % (length)) 1025 # if length <= 1 : 1026 # break 1027 # time.sleep(0.5) 1028 1029 1030 1031 # # # ----------------------- 1032 # #线程执行代码的封装 1033 # thread_name = Class_MyThread() 1034 # thread_name.start() 1035 1036 1037 1038 # # # ----------------------- 1039 # #线程执行顺序 1040 # Function_Test_A() 1041 1042 1043 1044 # # ----------------------- 1045 # 多线程-共享全局变量 1046 # global_var_number_A = 100 1047 # 1048 # print("创建线程之前,number is %d" %(global_var_number_A)) 1049 # thread_name_A = threading.Thread(target = Function_Work_A ) 1050 # thread_name_A.start() 1051 # time.sleep(1) 1052 # 1053 # thread_name_B = threading.Thread(target = Function_Work_B ) 1054 # thread_name_B.start() 1055 1056 1057 1058 # # ----------------------- 1059 # 列表当做实参传递到线程中 1060 # global_nums = [11,22,33,44,55] 1061 # thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,)) 1062 # thread_name_C.start() 1063 # thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,)) 1064 # thread_name_D.start() 1065 1066 1067 1068 # # # ----------------------- 1069 # # 同步 1070 # global_var_number = 0 1071 # thread_name_E = threading.Thread(target = Function_Test_B) 1072 # thread_name_E.start() 1073 # time.sleep(3) 1074 # thread_name_F = threading.Thread(target = Function_Test_C) 1075 # thread_name_F.start() 1076 # 1077 # print("number: %s" % (global_var_number)) 1078 1079 1080 1081 # # ----------------------- 1082 # 互斥锁 1083 # global_var_number = 0 1084 # # 创建锁 1085 # mutex = threading.Lock() 1086 # 1087 # thread_name_D = threading.Thread(target=Function_Test_D) 1088 # thread_name_D.start() 1089 # 1090 # thread_name_E = threading.Thread(target = Function_Test_E) 1091 # thread_name_E.start() 1092 # 1093 # thread_name_F = threading.Thread(target = Function_Test_F) 1094 # thread_name_F.start() 1095 # 1096 # print("number: %s" % (global_var_number)) 1097 1098 1099 # # # ----------------------- 1100 # # 多线程-非共享数据 1101 # # 创建锁 1102 # mutex = threading.Lock() 1103 # thread_name_G = Class_My_Thread(100, 3) 1104 # thread_name_G.start() 1105 # 1106 # thread_name_H = Class_My_Thread(200, 1) 1107 # thread_name_H.start() 1108 # 1109 # thread_name_I = Class_My_Thread(300, 2) 1110 # thread_name_I.start() 1111 1112 1113 1114 # # # ----------------------- 1115 # # 死锁 不能执行的效果 1116 # mutexA = threading.Lock() 1117 # mutexB = threading.Lock() 1118 # thread_name_J = Class_My_Thread_A() 1119 # thread_name_J.start() 1120 # thread_name_K = Class_My_Thread_B() 1121 # thread_name_K.start() 1122 # 1123 # print("...") 1124 # #避免死锁 1125 # # 程序设计时,要尽量避免,银行家算法 1126 # # 添加超时时间等 acquire(timeout)很重要!!! 1127 1128 1129 # # #----------------------- 1130 # # 同步应用 1131 # # 创建锁 1132 # mutexC = threading.Lock() 1133 # 1134 # mutexD = threading.Lock() 1135 # mutexD.acquire() 1136 # mutexE = threading.Lock() 1137 # mutexE.acquire() 1138 # 1139 # thread_name_L = Class_Task_C() 1140 # thread_name_L.start() 1141 # 1142 # thread_name_M = Class_Task_D() 1143 # thread_name_M.start() 1144 # 1145 # thread_name_N = Class_Task_E() 1146 # thread_name_N.start() 1147 1148 1149 # # #----------------------- 1150 # # Queue 提供了同步、线程安全的队列类 1151 # # 队列原则: 先进先出 FIFO 1152 # # 栈的原则: 先进后出 LIFO 1153 # # 这些队列都实现了锁原语,原子操作 1154 # #创建队列 ,这个队列只能用于线程,而进程中不能使用 1155 # queue_name = Queue() # 队列 1156 # # 队列初创 1157 # for var in range(500): 1158 # queue_name.put("》》初始产品"+ str(var)) 1159 # # 创建二次线程 producer 1160 # for var in range(2): 1161 # producer_name = Class_Producer_F() 1162 # producer_name.start() 1163 # # 创建二次线程 consumer 1164 # for var in range(5): 1165 # producer_name = Class_Consumer_G() 1166 # producer_name.start() 1167 # # 2 + 5 +1(主线程) = 8 (总线程) 1168 1169 1170 1171 # # #----------------------- 1172 # # ThreadLocal 1173 # # 创建全局ThreadLocal对象 1174 # local_school = threading.local() 1175 # thread_name_O = threading.Thread(target=Function_Process_thread, args=(‘图形分析‘,), name=‘Thread-A‘) 1176 # thread_name_O.start() 1177 # thread_name_O.join() 1178 # thread_name_P = threading.Thread(target = Function_Process_thread, args=(‘数据处理‘,), name=‘Thread-B‘) 1179 # thread_name_P.start() 1180 # thread_name_P.join() 1181 # 1182 # thread_name_Q = threading.Thread(target = Function_Process_thread, args=(‘信号处理‘,), name=‘Thread-C‘) 1183 # thread_name_Q.start() 1184 # thread_name_Q.join() 1185 1186 1187 # # #----------------------- 1188 # # 异步 1189 # #创建进程池 1190 # pool_name = Pool(3) 1191 # # callback 回调函数 子线程结束后,立刻执行回调函数 1192 # pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H ) 1193 # 1194 # # 异步的理解:主进程正在做某件事情, 1195 # # 突然来了一件更需要立刻去做的事情, 1196 # # 那么这种,在父进程去做某件事情的时候 1197 # # 并不知道是什么时候去做,的模式 就称为异步 1198 # while True: 1199 # time.sleep(1) 1200 # print("主进程 PID = [%s]" % (os.getpid())) 1201 1202 print("learn finish") 1203
--
-----------------------------------------------------
原文地址:https://www.cnblogs.com/caochucheng/p/9974843.html
时间: 2024-10-10 10:31:43