Python学习【第21篇】:进程池以及回调函数

python并发编程之多进程2-------------数据共享及进程池和回调函数

一、数据共享

1.进程间的通信应该尽量避免共享数据的方式

2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的。

虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此。

?


1

2

3

4

命令就是一个程序,按回车就会执行(这个只是在windows情况下)

tasklist 查看进程

tasklist | findstr  pycharm   #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了,

管道后面的findstr  pycharm就接收了)

3.(IPC)进程之间的通信有两种实现方式:管道和队列

 1 from multiprocessing import Manager,Process,Lock
 2 def work(dic,mutex):
 3     # mutex.acquire()
 4     # dic[‘count‘]-=1
 5     # mutex.release()
 6     # 也可以这样加锁
 7     with mutex:
 8         dic[‘count‘] -= 1
 9 if __name__ == ‘__main__‘:
10     mutex = Lock()
11     m = Manager()  #实现共享,由于字典是共享的字典,所以得加个锁
12     share_dic = m.dict({‘count‘:100})
13     p_l = []
14     for i in range(100):
15         p = Process(target=work,args=(share_dic,mutex))
16         p_l.append(p)  #先添加进去
17         p.start()
18     for i in p_l:
19         i.join()
20     print(share_dic)
21 # 共享就意味着会有竞争,

二、进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

那么什么是进程池呢?进程池就是控制进程数目

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

进程池的结构:

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

1.创建进程池

?


1

Pool([numprocess  [,initializer [, initargs]]]):创建进程池

2.参数介绍

?


1

2

3

numprocess:要创建的进程数,如果省略,将默认为cpu_count()的值,可os.cpu_count()查看

initializer:是每个工作进程启动时要执行的可调用对象,默认为None

initargs:是要传给initializer的参数组

3.方法介绍

?


1

2

3

4

5

6

7

8

9

10

11

12

13

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行

func(*args,**kwargs),然后返回结果。

需要强调的是:此操作并不会在所有池工作进程中并执行func函数。

如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()

函数或者使用p.apply_async()

p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,

callback是可调用对象,接收输入参数。当func的结果变为可用时,

将理解传递给callback。callback禁止执行任何阻塞操作,

否则将接收其他异步操作中的结果。

   

p.close():关闭进程池,防止进一步操作。禁止往进程池内在添加任务(需要注意的是一定要写在close()的上方)

?


1

P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

应用1:

 1 from multiprocessing import Pool
 2 import os,time
 3 def task(n):
 4     print(‘[%s] is running‘%os.getpid())
 5     time.sleep(2)
 6     print(‘[%s] is done‘%os.getpid())
 7     return n**2
 8 if __name__ == ‘__main__‘:
 9     # print(os.cpu_count())  #查看cpu个数
10     p = Pool(4) #最大四个进程
11     for i in range(1,7):#开7个任务
12         res = p.apply(task,args=(i,))  #同步的,等着一个运行完才执行另一个
13         print(‘本次任务的结束:%s‘%res)
14     p.close()#禁止往进程池内在添加任务
15     p.join() #在等进程池
16     print(‘主‘)

 1 # ----------------
 2 # 那么我们为什么要用进程池呢?这是因为进程池使用来控制进程数目的,
 3 # 我们需要几个就开几个进程。如果不用进程池实现并发的话,会开很多的进程
 4 # 如果你开的进程特别多,那么你的机器就会很卡,所以我们把进程控制好,用几个就
 5 # 开几个,也不会太占用内存
 6 from multiprocessing import Pool
 7 import os,time
 8 def walk(n):
 9     print(‘task[%s] running...‘%os.getpid())
10     time.sleep(3)
11     return n**2
12 if __name__ == ‘__main__‘:
13      p = Pool(4)
14      res_obj_l = []
15      for i in range(10):
16          res = p.apply_async(walk,args=(i,))
17          # print(res)  #打印出来的是对象
18          res_obj_l.append(res)  #那么现在拿到的是一个列表,怎么得到值呢?我们用个.get方法
19      p.close() #禁止往进程池里添加任务
20      p.join()
21      # print(res_obj_l)
22      print([obj.get() for obj in res_obj_l])  #这样就得到了

那么什么是同步,什么是异步呢?

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去

异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

什么是串行,什么是并行呢?

举例:能并排开几辆车的就可以说是“并行”,只能一辆一辆开的就属于“串行”了。很明显,并行的速度要比串行的快得多。(并行互不影响,串行的等着一个完了才能接着另一个)

应用2:

使用进程池维护固定数目的进程(以前客户端和服务端的改进)

 1 from socket import *
 2 from multiprocessing import Pool
 3 s = socket(AF_INET,SOCK_STREAM)
 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
 5 s.bind((‘127.0.0.1‘,8081))
 6 s.listen(5)
 7 print(‘start running...‘)
 8 def talk(coon,addr):
 9     while True:  # 通信循环
10         try:
11             cmd = coon.recv(1024)
12             print(cmd.decode(‘utf-8‘))
13             if not cmd: break
14             coon.send(cmd.upper())
15             print(‘发送的是%s‘%cmd.upper().decode(‘utf-8‘))
16         except Exception:
17             break
18     coon.close()
19 if __name__ == ‘__main__‘:
20     p = Pool(4)
21     while True:#链接循环
22         coon,addr = s.accept()
23         print(coon,addr)
24         p.apply_async(talk,args=(coon,addr))
25     s.close()
26 #因为是循环,所以就不用p.join了

 1 from socket import *
 2 c = socket(AF_INET,SOCK_STREAM)
 3 c.connect((‘127.0.0.1‘,8081))
 4 while True:
 5     cmd = input(‘>>:‘).strip()
 6     if not cmd:continue
 7     c.send(cmd.encode(‘utf-8‘))
 8     data = c.recv(1024)
 9     print(‘接受的是%s‘%data.decode(‘utf-8‘))
10 c.close()

三、回调函数

?


1

2

3

4

5

6

7

回调函数什么时候用?(回调函数在爬虫中最常用)

造数据的非常耗时

处理数据的时候不耗时

你下载的地址如果完成了,就自动提醒让主进程解析

谁要是好了就通知解析函数去解析(回调函数的强大之处)

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 import time
 5 def get_page(url):
 6     print(‘<%s> is getting [%s]‘ %(os.getpid(),url))
 7     response = requests.get(url)  #得到地址
 8     time.sleep(2)
 9     print(‘<%s> is  done [%s]‘%(os.getpid(),url))
10     return {‘url‘:url,‘text‘:response.text}
11 def parse_page(res):
12     ‘‘‘解析函数‘‘‘
13     print(‘<%s> parse [%s]‘%(os.getpid(),res[‘url‘]))
14     with open(‘db.txt‘,‘a‘) as f:
15         parse_res = ‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘]))
16         f.write(parse_res)
17 if __name__ == ‘__main__‘:
18     p = Pool(4)
19     urls = [
20         ‘https://www.baidu.com‘,
21         ‘http://www.openstack.org‘,
22         ‘https://www.python.org‘,
23         ‘https://help.github.com/‘,
24         ‘http://www.sina.com.cn/‘
25     ]
26     for url in urls:
27         obj = p.apply_async(get_page,args=(url,),callback=parse_page)
28     p.close()
29     p.join()
30     print(‘主‘,os.getpid())  #都不用.get()方法了

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 def get_page(url):
 5     print(‘<%os> get [%s]‘ %(os.getpid(),url))
 6     response = requests.get(url)  #得到地址  response响应
 7     return {‘url‘:url,‘text‘:response.text}
 8 if __name__ == ‘__main__‘:
 9     p = Pool(4)
10     urls = [
11         ‘https://www.baidu.com‘,
12         ‘http://www.openstack.org‘,
13         ‘https://www.python.org‘,
14         ‘https://help.github.com/‘,
15         ‘http://www.sina.com.cn/‘
16     ]
17     obj_l= []
18     for url in urls:
19         obj = p.apply_async(get_page,args=(url,))
20         obj_l.append(obj)
21     p.close()
22     p.join()
23     print([obj.get() for obj in obj_l])

原文地址:https://www.cnblogs.com/kcwxx/p/10145352.html

时间: 2024-11-09 04:49:38

Python学习【第21篇】:进程池以及回调函数的相关文章

python学习四十天(进程池)

今日主要内容: 1.队列 2.管道 3.数据共享 4.进程池 5.回调函数 一.队列 队列: 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. Queue([maxsize]) 创建共享的进程队列. 参数 :maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制. 底层队列使用管道和锁定实现 Queue([maxsize]) 创建共享的进程队列.maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制.底层队列使用管道和锁定实现

Python 3 进程池与回调函数

Python 3 进程池与回调函数 一.进程池 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是: 很明显需要并发执行的任务通常要远大于核数 一个操作系统不可能无限开启进程,通常有几个核就开几个进程 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行) 例如当被操作对象数目不大时,可以直接利用multiprocessing中的Proces

W10_Pipe_Manager数据共享_进程池和回调函数

[TOC] #管道 ``` from multiprocessing import Pipe,Process def func(conn2): print(conn2.recv()) conn1,conn2 = Pipe() conn1.send("Hello pipe") p = Process(target=func, args=(conn2,)) p.start() ``` **多进程中管道异常EOFError** ``` from multiprocessing import

进程间的数据共享、进程池的回调函数和线程初识、守护线程

一.进程的数据共享 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的 虽然进程间数据独立,但可以通过Manager实现数据共享. 把所有实现了数据共享的比较便捷的类都重新又封装了一遍,并且在原有的multiprocessing基础上增加了新的机制 list dict等 数据共享的机制 支持数据类型非常有限 list dict都不是数据安全的,你需要自己加锁来保证数据安全 Manager用法: Manager().dict() # 创建共享的字典 Manager().lis

python全栈开发基础【第二十二篇】进程池和回调函数

一.数据共享 1.进程间的通信应该尽量避免共享数据的方式 2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的. 虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此. 命令就是一个程序,按回车就会执行(这个只是在windows情况下) tasklist 查看进程 tasklist | findstr pycharm #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了, 管道后面的findst

进程池与回调函数

一.进程池(重点) 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是: 1.很明显需要并发执行的任务通常要远大于核数 2.一个操作系统不可能无限开启进程,通常有几个核就开几个进程 3.进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行) 例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进

4月27日 python学习总结 GIL、进程池、线程池、同步、异步、阻塞、非阻塞

一.GIL:全局解释器锁 1 .GIL:全局解释器锁 GIL本质就是一把互斥锁,是夹在解释器身上的, 同一个进程内的所有线程都需要先抢到GIL锁,才能执行解释器代码 2.GIL的优缺点: 优点:  保证Cpython解释器内存管理的线程安全 缺点:同一进程内所有的线程同一时刻只能有一个执行,也就说Cpython解释器的多线程无法实现并行 二.GIL与多线程 有了GIL的存在,同一时刻同一进程中只有一个线程被执行 听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而python的多线程开销

第36篇 多进程的数据共享,进程池的回调函数,线程 什么是GIL锁,Threading模块记

内容概览: 进程 数据共享 进程池--回调函数 线程 线程的基础理论 什么是线程? 线程与进程的关系 GIL锁 线程的开启: Threading模块1,用多进程开启socket创建聊天 server端写了input函数会报错?因为服务器是高速运行的,自动化的为来访问的客户端提供服务, 不可能停下来等待管理员的输入,然后发送给客户.这就失去了自动化的意义. 2,进程池Pool()方法创建的进程,map()方法是否有返回值? p.map()得到的是迭代对象 import time from mult

Python网编_进程池的回调函数

将n个任务交给n个进程去执行每一个进程在执行完毕之后会有一个返回值,这个返回值交给callback函数指定的那个函数去处理这样的话所有的进程哪一个执行的最后快,哪一个就可以先进性统计工作这样就能在最短的时间内得到我们想要的结果 import time import random from multiprocessing import Pool def get(i): # 使用i模拟网站地址 在子进程中执行 time.sleep(random.random()) # 模拟不同的网站返回数据的时间