进程数据共享-进程池

数据共享

Manager  内部管理了很多数据类型,并不是所有的数据类型都是用来做数据分享,只是顺便包含了能够处理数据共享问题的数据类型 list dict

列表/字典  自带的方法基本都是数据安全的,但是对其中的元素进行+= -= *=  /=   都是数据不安全的

from multiprocessing import Manager,Process,Lock

def func(dic,lock):
    with lock:
        dic[‘count‘] -= 1

if __name__ == ‘__main__‘:
    m = Manager()
    lock = Lock()
    dic = m.dict({‘count‘:100})
    p_l = []
    for i in range(100):
        p = Process(target=func,args=(dic,lock))
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(dic)

数据共享:速度很慢,牵扯到锁的问题,一般使用数据库进行数据共享,利用第三方工具--消息中间件(消息队列)进行通信

进程池

在一台四核计算机上,如果有上百个没有IO阻塞的计算任务,开启上百个进程会使效率降低,可以利用进程池,开启几个进程,用过异步提交执行任务

异步提交

import time
from multiprocessing import Pool,Process

def func(i):
    i * i

if __name__ == ‘__main__‘:
    start = time.time()
    p = Pool()    #括号里的参数默认是cpu的个数,开启cpu个数的进程
    for i in range(20):
        p.apply_async(func,(i,))   #apply提交任务,async异步
    p.close()           #关闭进程池,不允许再继续向这个池子中添加任务了
    p.join()            #阻塞  直到已经被提交到进程池中的任务全部结束
    print(time.time() - start)

获取返回值

import time
from multiprocessing import Pool

def func(i):
    i * i
    time.sleep(1)
    return ‘i‘* i

if __name__ == ‘__main__‘:
    p = Pool()
    ret_l = []
    for i in range(50):
        ret = p.apply_async(func,(i,))
        ret_l.append(ret)    # ret接收返回值,并存放在列表里
    for ret in ret_l:
        print(ret.get())

map方法

import time
from multiprocessing import Pool

def func(i):
    i * i
    time.sleep(1)
    return ‘i‘* i

if __name__ == ‘__main__‘:
    p = Pool()
    ret_l = p.map(func,range(5))   #map就是一种简便的apply_async的方式,并且内置了close和join的功能,参数必须规定个数
    for ret in ret_l:
        print(ret)

同步提交

按照顺序一个一个执行,而且还有关于进程的开销,反而降低效率,不推荐使用

import os
import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    print(i,os.getpid())

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(20):
        p.apply(func,(i,))     # 同步提交

回调函数

# 利用网址得到网页信息
import os
from urllib import request
from multiprocessing import Pool
def parser_page(content):
    print(os.getpid())
    print(‘len : ‘,len(content))

def get_url(url):
    ret = request.urlopen(url)
    content = ret.read().decode(‘utf-8‘)
    return content

if __name__ == ‘__main__‘:
    print(os.getpid())
    url_lst = [
        ‘http://www.cnblogs.com/Eva-J/articles/8253549.html‘,
        ‘http://www.cnblogs.com/Eva-J/articles/8306047.html‘,
        ‘http://www.baidu.com‘,
        ‘http://www.sogou.com‘,
        ‘https://www.cnblogs.com/Eva-J/p/7277026.html‘
    ]
    p = Pool()
    # ret_l = []
    for url in url_lst:
        ret = p.apply_async(get_url,(url,),callback=parser_page)  # 异步的方式提交任务
        # callback 将get_url的返回值给parser_page,并且立即执行parser_page
        # 如果异步的任务执行完毕之后需要立即做另外的操作,推荐使用collback和生产者消费者模型
        # ret_l.append(ret)
    p.close()              # 不获取结果就这么写
    p.join()
    # for ret in ret_l:    # 要获取结果就这么写
    #     res = ret.get()
    #     parser_page(res)

利用多进程实现并发的socketserver

# server端:
from socket import *
from multiprocessing import Pool

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen()

def talk(conn):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == ‘__main__‘:
    p = Pool()
    while True:
        conn,addr = server.accept()
        p.apply_async(talk,args=(conn,))

# client端:
import socket

client = socket.socket()
client.connect((‘127.0.0.1‘,8080))

while True:
    msg = input(‘>>>‘).strip()
    if not msg: continue
    client.send(msg.encode(‘utf-8‘))
    msg = client.recv(1024)
    print(msg.decode(‘utf-8‘))

原文地址:https://www.cnblogs.com/sandy-123/p/10458402.html

时间: 2024-08-10 10:26:14

进程数据共享-进程池的相关文章

老男孩学习DAY11-1 进程、进程池、协程

python 进程 优点:可以处理大量的并发操作,使用IO计算型 缺点:由于进程之间的数据都是独立,所以创建一个进程,就得消耗一份内存 (进程和cpu核数相同的情况最好) Process :进程 (让我想到了40个人,要烧40壶水,要弄40个炉子,但是效率高) 进程中有 join (2)   阻塞住啦,最多阻塞2秒钟:demaon(true)  可以设置不阻塞,直接运行. 都说进程之间的数据是独立,那么我们你能将进程之间的数据共享吗,聪明的人类,当然可以,那就用到了mange和array arr

线程,线程池|进程,进程池

线程 import threading 锁 lock=threading.Lock() # lock=threading.RLock() #递归锁,多层锁定,多层解锁 lock.acquire() lock.release() import threading import time v = 10 #lock = threading.Lock() # 只能开一把 lock = threading.RLock()# 可以开多把 def task(arg): time.sleep(2) # 申请使用

python的学习之旅---进程和进程池

为了能让任务并发我们需要开启进程 开启进程的两种方法 一 是基于函数实现的 二 是基于类实现的 我们开启进程的数量也不是无穷的 所以需要引入进程池的概念 1 #程序的执行方式: 2 #一:串行执行 糖葫芦 一个一个来 3 #二:并行执行 一起走大横排 4 5 6 #同步调用:提交一个任务后,在原地等着,等到该任务运行完毕,拿到结果以后,再执行下一行代码 7 #异步调用:提交一个任务后,不用在原地等着,直接执行下一行代码,结果呢? shutdown代表不允许再往进程池里提交任务,wait=True

python进程、进程池(二)代码部分

第一种创建进程的方式: from multiprocessing import Process def f(name): print(name,"在子进程") if __name__ == "__main__": p = Process(target=f,args=("aaa",)) p.start() print("执行主进程内容") # 打印内容如下 执行主进程内容 aaa 在子进程 从打印结果我们可以看出程序先执行了主进

Python程序中的进程操作-进程池(multiprocess.Pool)

Python程序中的进程操作-进程池(multiprocess.Pool) 一.进程池 为什么要有进程池?进程池的概念. 在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务.那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间.第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率.因此我们不能无限制的根据任务开启或者结束进程.那么我们要怎么做呢? 在这里,要给大家介

进程篇—进程整理(转)

一.概括 系统启动架构图: 上图在Android系统-开篇中有讲解,是从Android系统启动的角度来分析,本文是从进程/线程的视角来分析该问题. 1.1 父进程 在所有进程中,以父进程的姿态存在的进程(即图中的浅红色项),如下: kthreadd进程: 是所有内核进程的父进程 init进程 : 是所有用户进程的父进程(或者父父进程) zygote进程 : 是所有上层Java进程的父进程,另外zygote的父进程是init进程. 1.2 重量级进程 在Android进程中,有3个非常重要的进程(

python并发编程(守护进程,进程锁,进程队列)

进程的其他方法 P = Process(target=f,) P.Pid 查看进程号  查看进程的名字p.name P.is_alive()  返回一个true或者False P.terminate()  给操作系统发送一个结束进程的信号 验证进程之间是空间隔离的 from multiprocessing import Process num = 100 def f1(): global num num = 3 print(num) # 结果 3 if __name__ == '__main__

进程(二) —— 进程控制块

进程(二) —— 进程控制块 进程控制块(PCB, Process Control Block) 定义: 操作系统管理和控制进程运行所用的信息集合 操作系统用 PCB 来 描述进程的 基本情况 以及 运行变化 的过程 PCB是进程存在的唯一标志 每个进程都在 操作系统 中有一个对应的PCB 进程控制块的使用 进程创建 生成 该进程的 PCB 进程终止 回收它的PCB 进程的组织管理 通过对PCB的组织管理来实现 进程控制块内容 进程标志信息 处理机现场保存 进程控制信息 进程控制信息 调度和状态

孤儿进程 && 僵尸进程

background: unix: 每个子进程退出,内核释放该进程所有资源,打开的文件,占用的内存 保留的信息:the process ID,the termination status of the process,the amount of CPU time taken by the process 父进程用wait()/waitpid()释放子进程的保留信息 父进程不调用wait()/waitpid()进程号一直被占用,系统所能提供的进程号有限,没有可用的进程号导致系统不能产生新的进程 Z