python 之进程

什么是进程process:

运行中的程序就称为进程,我们写好的程序,如果不运行的话就是一堆普通的字符,没有任何意义。

并行与并发:

并发:伪并行,看起来是同时运行,实际是cpu+多道技术实现,cpu在多个任务间快速切换,给我们造成的感觉就好像是在同时运行。

并行:只有具备多个cpu才能实现真正意义上的并行。多个任务同时进行,但是最大运行数为cpu个数。

multiprocessing  模块:

python中的多线程无法利用cpu的多核优势(GIL锁约束一个进程内同一时间内,只能运行一个线程)。如果想要充分利用cpu的多核资源,就要使用多进程。python中的multiprocessing模块为我们提供了开启子进程的方法,并在子进程中执行我们定制的任务。

multiprocessing 模块提供Process,Queue ,Pipe,Lock等组件。

与线程不同,进程的初始化完全copy其父进程内存地址,进程间数据不共享,进程数据的修改仅限于该进程内部。

Process类

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        ‘‘‘参数介绍:group(这个应该是不用的)target:要调用的对象,即要执行的任务(函数)name:子进程的名字
        args:表示要要调用对象的位置参数,kwargs:表示要调用对象的字典。‘‘‘
        self.name = ‘‘#进程名
        self.daemon = False#默认为False设置为True 表示为守护进程,在start()之前设置,随着主进程结束而结束。
        self.authkey = None#进程的身份验证键,默认为os.urandom()随机生成的32为字符串
        self.exitcode = None#进程运行时为None,如果为-N,表示信号N结束
        self.ident = 0
        self.pid = 0#进程的pid
        self.sentinel = None

    def run(self):#进程启动,自定义进程时必须实现
        pass

    def start(self):#启动进程
        pass

    def terminate(self):#强制终止进程,不推荐使用,容易造成僵尸进程和死锁
        pass

    def join(self, timeout=None):#设置超时时间
        pass

    def is_alive(self):#判断进程是否正在运行,正在运行返回True
        return False

Process类介绍

创建线程的两种方式:

import time
import random
from multiprocessing import Process
def piao(name):
    print(‘%s piaoing‘ %name)
    time.sleep(random.randrange(1,5))
    print(‘%s piao end‘ %name)

p1=Process(target=piao,args=(‘egon‘,)) #必须加,号
p2=Process(target=piao,args=(‘alex‘,))
p3=Process(target=piao,args=(‘wupeqi‘,))
p4=Process(target=piao,args=(‘yuanhao‘,))

p1.start()
p2.start()
p3.start()
p4.start()
print(‘主线程‘)

方式一

import time
import random
from multiprocessing import Process
class MyProcess(Process):
    def __init__(self,name,money):
        super().__init__()
        self.name = name
        self.money = money
    def run(self):
        print("%s is piaoing"%self.name)
        time.sleep(random.randint(1,3))
        print("%s is piao end"%self.name)
        print("%s  pay %s yuan for piao"%(self.name,self.money))
if __name__ == ‘__main__‘:

    p1= MyProcess("egon",200)
    p2= MyProcess("alex",None)
    p3= MyProcess("yuanhao",500)
    p1.start()#start 会自动调用run方法
    p2.start()
    p3.start()
    print("主进程".center(30,"*"))

方式二

进程实现socket并发

###服务端
from socket import *
from multiprocessing import Process
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(("192.168.20.45",8800))
s.listen(5)
def talk(conn,addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:continue
            conn.send(msg.upper())
        except Exception:
            break
if __name__ == ‘__main__‘:
    while True:
        conn,addr = s.accept()
        p = Process(target = talk,args = (conn,addr))
        p.start()

####客户端

from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(("192.168.20.45",8800))
while True:
    msg = input(">>>>:").strip()
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))

socket 并发

Process 对象的join方法:

from multiprocessing import Process
import time,random
def piaochang(name,money):
    print("%s is piaoing"%name)
    time.sleep(random.randrange(5))
    print("%s is piao end"%name)
    print("%s pay %s yuan for piaochang"%(name,money))
if __name__ == ‘__main__‘:
    start = time.time()
    p1 = Process(target=piaochang,args = (‘egon‘,200))
    p2 = Process(target=piaochang,args = (‘alex‘,None))
    p3 = Process(target=piaochang,args = (‘wupeiqi‘,500))

    p_l = [p1,p2,p3]
    for p in  p_l:
        p.start()
        # p.join()#如果将join放在这里,就是串行的结果  主线程 9.459348678588867
    for p in p_l:
        p.join()#join的意思是等待所有上面的程序执行完后再往下执行代码   主线程 4.645843029022217
                #join是让主进程等待子进程运行结束,卡住的是主进程
    print("主线程",time.time()-start)

Process join 方法

Process 其它属性或方法:

from multiprocessing import Process,current_process
import time,random
class Piao(Process):
    def __init__(self,name,money):
        super().__init__()
        self.name = name
        self.money = money
    def run(self):
        print("%s is piaoing"%self.name)
        time.sleep(random.randrange(5))
        print("%s is piao end,pay %s yuan"%(self.name,self.money))
if __name__ == ‘__main__‘:
    p = Piao("egon",200)
    p.start()
    print(p.is_alive())#查看进程p是否存活
    print(p.pid)#子进程ip
    print(p.name)#查看进程名,这里为"egon",默认为Process1
    p.terminate()#强制关闭子进程
    time.sleep(0.1)#这里需要注意,关闭进程不会立马关闭,所以is_alive 有可能还存活,这里让它停小会
    print(p.is_alive())#存活状态为False

守护进程:

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from  multiprocessing import Process
import time,random
def piao(name):
    print("%s is piaoing"%name)
    time.sleep(random.randint(1,3))
    print("%s is piao done"%name)
if __name__ == ‘__main__‘:
    p = Process(target=piao,args = ("egon",))
    p.daemon = True
    p.start()
    print("主")#执行完这里意味着p这个守护进程已经结束,它后面的代码也将不会运行

守护进程

同步锁:

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,,竞争带来的结果就是错乱,如何控制,就是加锁处理

# #不加锁,并发,效率高,数据错乱
from multiprocessing import Process,Lock
import time,os
def work():
    print("%s is running"%os.getpid())
    time.sleep(1)
    print("%s is done"%os.getpid())
if __name__ == ‘__main__‘:
    for i in range(5):
        p = Process(target=work)
        p.start()
# 加锁,变并发伪串行,效率低,数据安全
def work(mutex):
    mutex.acquire()
    print("%s is running "%os.getpid())
    time.sleep(1)
    print(‘%s is done‘%os.getpid())
    mutex.release()
if __name__ == ‘__main__‘:
    mutex = Lock()
    for i in range(5):
        p = Process(target=work,args = (mutex,))
        p.start()

同步锁

模拟抢票对同一文件操作:

#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open(‘db.txt‘))
    print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘])

def get():
    dic=json.load(open(‘db.txt‘))
    time.sleep(0.1) #模拟读数据的网络延迟
    if dic[‘count‘] >0:
        dic[‘count‘]-=1
        time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(‘\033[43m购票成功\033[0m‘)

def task(lock):
    search()
    get()
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(100): #模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()

并发效率高,数据错乱

from multiprocessing import Process,Lock
import time,json,os
def search():
    dic = json.load(open(‘db.txt‘))
    print("\033[40m 剩余票数%s\033[0m"%dic["count"])
def get_ticket():
    dic = json.load(open(‘db.txt‘))
    time.sleep(0.5)#模拟网络延迟
    if dic["count"]>0:
        dic["count"]-=1
        time.sleep(0.2)
        json.dump(dic,open("db.txt","w"))
        print("\033[43m%s购票成功\033[0m"%os.getpid())
def task(mutex):
    search()
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == ‘__main__‘:
    mutex = Lock()
    for i in range(100):
        p = Process(target=task,args = (mutex,))
        p.start()

加锁,变并发为串行,效率低,数据安全

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低
2.需要自己加锁处理

为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

对列:

进程相互隔离,multiprocessing 模块为我们提供了两种进程间的通信方式:对列和管道

class Queue(object):
    def __init__(self, maxsize=-1):
        self._maxsize = maxsize  #对列中允许的最大项数,省略则无限制

    def qsize(self):#没什么卵用,返回队列中目前项目的正确数量
        return 0

    def empty(self):#判断对列是否为空
        return False

    def full(self):#判断对列是否满了
        return False

    def put(self, obj, block=True, timeout=None):#插入数据到对列  timeout设置超时时间

        pass

    def put_nowait(self, obj):#同put
        pass

    def get(self, block=True, timeout=None):#从对列中获取数据
        pass

    def get_nowait(self):#同get
        pass

    def close(self):#关闭对列,防止对列中加入更多的数据
        pass

    def join_thread(self):#连接后台线程
        pass

    def cancel_join_thread(self):#不会在进程退出时自动连接后台线程
        pass

对列属性和方法

应用:

from multiprocessing import Process,Queue

q = Queue(3)
for i in range(3):
    q.put(i)
print(q.full())#True
for i in range(3):
    q.get(i)
print(q.empty())#True

生产者消费者模型:

什么是生产者消费者模型:

生产者消费者模型是通过一个容器来解决生产者和消费者强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞对列来进行通信,所以生产者生产完数据后,直接扔给阻塞对列,消费者不直接找生产者要数据,而是直接从阻塞对列里取,阻塞对列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

为什么要使用生产者消费者模型:

在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理的速度很快,而消费者的处理速度很慢,那么生产者就得等待消费者处理处理完,才能继续生产数据。同样,如果消费者处理速度大于生产者,那么消费者就必须等待消费者。为了解决生产者和消费者这种强耦合的问题,就引入了这个模型。

from multiprocessing import Process,Queue
import time,os,random
#消费者
def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,2))
        print("\033[45m %s 吃 %s\033[0m"%(os.getpid(),res))
#生产者
def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        res = "包子%s"%i
        q.put(res)
        print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res))
if __name__ == ‘__main__‘:
    q = Queue()
    #生产者
    p = Process(target=producer,args = (q,))
    #消费者
    c = Process(target= consumer,args=(q,))
    #开始进程
    p.start()
    c.start()
    print("主")
##此时的进程会陷入死循环中,因为生产者将数据生产完后,消费者
#在取到空之后,一直会循环q.get()这一步。
#要解决这个问题,就得生产者在生产完成后往对列中加一个结束的信号,这样消费者在接收到结束信号
#后就可以break出死循环。
from multiprocessing import Process,Queue
import time,os,random
#消费者
def consumer(q):
    while True:
        res = q.get()
        if res is None:break####收到结束后,结束循环
        time.sleep(random.randint(1,2))
        print("\033[45m %s 吃 %s\033[0m"%(os.getpid(),res))
#生产者
def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        res = "包子%s"%i
        q.put(res)
        print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res))
    q.put(None)####发送结束信号
if __name__ == ‘__main__‘:
    q = Queue()
    #生产者
    p = Process(target=producer,args = (q,))
    #消费者
    c = Process(target= consumer,args=(q,))
    #开始进程
    p.start()
    c.start()
    # p.join()####结束信号不一定由生产者发送,主进程也可以发送
    # p.put(None)####但是主进程需要确认生产者结束后才发送信号,这里用join方法确认
    print("主")
####但是如果有多个生产者和消费者我们就得发送很多次的None,这样也太low了
###don‘t worry multiprocessing 模块为我们提供了JoinableQueue方法,允许项目的使用者
###通知生成者项目已经被成功处理。通知进程是使用共享信号和条件变量来实现的。
###JoinableQueue除了和Queue相同的方法外还新增了q.task.done()使用者使用此方法发出信号。表示q.get()
###的返回项目已经被处理。
###q.join()生产者调用此方法进行阻塞,知道对列中所有的项目被处理。阻塞持续到对列中的每个项目均调用q.task_done()方法为止
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print("\033[40m %s eat %s\033[0m"%(os.getpid(),res))

        q.task_done()

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = "%s%s"%(name,i)
        q.put(res)
        print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res))
    q.join()
if __name__ == ‘__main__‘:
    q = JoinableQueue()

    p1 = Process(target=producer,args=("包子",q))
    p2 = Process(target=producer,args=("骨头",q))
    p3 = Process(target=producer,args=("泔水",q))

    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    c3 = Process(target=consumer,args=(q,))

    c1.daemon = True
    c2.daemon = True
    c3.daemon = True

    p_l = [p1,p2,p3,c1,c2,c3]
    for p in p_l:
        p.start()
    p1.join()
    p2.join()
    p3.join()

    print("主")
    #p1,p2,p3结束了,证明c1,c2,c3肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

生产者消费者模型

管道:

from multiprocessing import Process,Pipe
def consumer(p,name):
    left,right = p
    left.close()
    while True:
        try:
            baozi = right.recv()
            print("%s 收到包子:%s"%(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right = p
    right.close()
    for i in seq:
        left.send(i)
    else:
        left.close()
if __name__ == ‘__main__‘:
    left,right = Pipe()
    c1 = Process(target=consumer,args=((left,right),"c1") )
    c1.start()

    seq = (i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()
    c1.join()
    print("主")

管道,了解即可

共享数据:

 进程间的数据是独立的,但是可以借助对列或管道实现通信,二者都是基于消息传递的。

虽然进程间数据独立,但是可以通过Manager实现数据共享。

from multiprocessing import Manager,Process,Lock

def work(d,mutex):
    with mutex:#加锁处理
        d["count"]-= 1
if __name__ == ‘__main__‘:
    mutex = Lock()

    m = Manager()
    share_dic = m.dict({"count":100})
    p_l = []
    for i in range(100):
        p = Process(target = work,args = (share_dic,mutex))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(share_dic)#不加锁{"count":90}发生数据错乱

共享数据 Manager

信号量:

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random
def go_wc(sem,user):
    sem.acquire()
    print("%s 上厕所"%user)
    time.sleep(random.randint(1,3))
    print("%s 上完厕所"%user)
    sem.release()

if __name__ == ‘__main__‘:
    sem = Semaphore(5)
    p_l = []
    for i in range(10):
        p = Process(target =go_wc,args = (sem,"user%s"%i))
        p.start()
        p_l.append(p)
    for i in p_l:
        i.join()
    print("===========>")

信号量

Event 事件:

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

from multiprocessing import Process ,Event
import time,random
def car(e,n):
    while True:
        if not e.is_set():
            print("\033[31m 红灯亮\033[0m,car%s等着"%n)
            e.wait()
            print("\033[32m车%s 看见绿灯亮了\033[0m"%n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print("走你,car",n)
            break
def police_car(e,n):
    while True:
        if not e.is_set():
            print("\033[31m红灯亮\033[0m,car%s等着"%n)
            e.wait(1)
            print("灯是%s,警车走了,car %s"%(e.is_set(),n))
            break
def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear()
        else:
            e.set()
if __name__ == ‘__main__‘:
    e = Event()
    for i in range(5):

        p = Process(target=police_car,args = (e,i))
        p.start()
    t = Process(target=traffic_lights,args = (e,10))
    t.start()
    print("==========>")

Event

进程池:

进程不能无限开启,开启进程数目越多,效率反而会下降,需要一个机制来约束,这就是进程池。

主要方法:

p.apply() 同步提交任务

p.apply_async() 异步提交任务

p.close()关闭进程池

p.join()等待所有工作进程退出,此方法只能在close()或teminate()后调用

实例:

from multiprocessing import Pool
import os,time
def work(n):
    print("%s run"%os.getpid())
    time.sleep(3)
    return n**2
if __name__ == ‘__main__‘:
    p = Pool(3)#可以指定参数,也可以默认,默认为cpu的个数,进程池从无到有创建三个进程,以后一直都是这三个进程

    res_l = []
    for i in range(10):
        res = p.apply(work,args =(i,))#同步运行,阻塞,直到本次任务执行完毕拿到res
        res_l.append(res)
    print(res_l)

进程池,apply同步执行

from multiprocessing import Pool
import os,time
def work(n):
    print("%s run"%os.getpid())
    time.sleep(3)
    return n**2
if __name__ == ‘__main__‘:
    p = Pool(3)
    res_l = []
    for i in range(10):
        res = p.apply_async(work,args = (i,))
        res_l.append(res)
    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,
    # 等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get())#apply么有get方法,因为apply是同步执行,立刻可以获得结果,不需要get方法

进程池,apply_async()异步提交

使用进程池实现socket并发:

####服务端
from socket import *
from multiprocessing import Pool
import os
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(("192.168.20.45",8888))
s.listen(5)

def talk(conn,addr):
    print("进程pid:%s"%os.getpid())
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break
    conn.close()
if __name__ == ‘__main__‘:
    p = Pool()
    while True:
        conn,addr = s.accept()
        p.apply_async(talk,args = (conn,addr))

###客户端
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(("192.168.20.45",8888))
while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    msg = c.recv(1024)
    print(msg.decode("utf-8"))

并发多个客户端,服务端同一时间只有4个不同的pid,关闭一个经常后,后面进程才会进来。

回调函数:

需要回调函数的场景:

进程池中的任何一个任务一旦处理完成,就立即通知主进程,主进程调用一个函数去调用处理该结果,该函数 就是回调函数

我们可以把耗时的任务放到进程池中,然后指定回调函数负责处理,这样 进程就 省掉了I/O的过程,直接拿到任务结果。

def get_page(url):
    print(‘<%s> is getting [%s]‘ %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    print(‘<%s> is done [%s]‘ % (os.getpid(), url))
    return {‘url‘:url,‘text‘:response.text}
def parse_page(res):
    print(‘<%s> parse [%s]‘ %(os.getpid(),res[‘url‘]))
    with open(‘db.txt‘,‘a‘) as f:
        parse_res=‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘]))
        f.write(parse_res)
if __name__ == ‘__main__‘:
    p=Pool(4)
    urls = [
            ‘https://www.baidu.com‘,
            ‘http://www.openstack.org‘,
            ‘https://www.python.org‘,
            ‘https://help.github.com/‘,
            ‘http://www.sina.com.cn/‘
            ]
    for url in urls:
        p.apply_async(get_page,args=(url,),callback=parse_page)
    p.close()
    p.join()
    print(‘主‘)

回调函数

from multiprocessing import Pool
import time
import requests
import re
def get_page(url,pattern):
    response = requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)
def parse_page(info):
    print(info)
    page_content,pattern = info
    res = re.findall(pattern,page_content)
    for item in res:
        dic = {
            ‘index‘: item[0],
            ‘title‘: item[1],
            ‘actor‘: item[2].strip()[3:],
            ‘time‘: item[3][5:],
            ‘score‘: item[4] + item[5]

        }
        print(dic)

if __name__ == ‘__main__‘:
    pattern1 = re.compile(r‘<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<‘,re.S)
    url_dic = {"http://maoyan.com/board/7":pattern1,}
    p =Pool()
    res_l = []
    for url,pattern in url_dic.items():
        res = p.apply_async(get_page,args = (url,pattern),callback = parse_page)
        res_l.append(res)
    for i in res_l:
        i.get()

回调函数 应用爬虫

如果进程中等待进程池中所有任务都执行完毕后,再同一处理则无需回调函数

from multiprocessing import Pool
import time,random,os
def work(n):
    time.sleep(1)
    return n**2
if __name__ == ‘__main__‘:
    p = Pool()
    res_l = []
    for i in range(10):
        res = p.apply_async(work,args = (i,))
        res_l.append(res)
    p.close()
    p.join()
    nums = []
    for res in res_l:
        nums.append(res.get())
    print(nums)

时间: 2024-10-10 07:16:34

python 之进程的相关文章

Python实例浅谈之五Python守护进程和脚本单例运行

一.简介 守护进程最重要的特性是后台运行:它必须与其运行前的环境隔离开来,这些环境包括未关闭的文件描述符.控制终端.会话和进程组.工作目录以及文件创建掩码等:它可以在系统启动时从启动脚本/etc/rc.d中启动,可以由inetd守护进程启动,也可以有作业规划进程crond启动,还可以由用户终端(通常是shell)执行. Python有时需要保证只运行一个脚本实例,以避免数据的冲突. 二.Python守护进程 1.函数实现 #!/usr/bin/env python #coding: utf-8

#python#守护进程的实现

找了整天,终于找到一个可以用的代码 #! /usr/bin/env python2.7 #encoding:utf-8 #@description:一个python守护进程的例子 #@tags:python,daemon import sys import os import time import atexit from signal import SIGTERM      class Daemon:   """   A generic daemon class.     

Python 3 进程池与回调函数

Python 3 进程池与回调函数 一.进程池 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是: 很明显需要并发执行的任务通常要远大于核数 一个操作系统不可能无限开启进程,通常有几个核就开几个进程 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行) 例如当被操作对象数目不大时,可以直接利用multiprocessing中的Proces

python 守护进程

daemon进程 守护进程 1.fork子进程,而后父进程退出,此时子进程会被init进程接管 2.修改子进程的工作目录,创建新进程组和新会话,修改umask 3.子进程再次fork一个进程,这个进程可以称为孙子进程,而后子进程退出 4.重定向孙子进程标准输入流,标准输出等 atexit程序退出 调用某个函数 kill  级别 python 守护进程,布布扣,bubuko.com

【Python】使用Supervisor来管理Python的进程

1.问题描述 需要一个python的服务程序在后台一直运行,不能让该进程被杀死,即使被杀死也要能及时自动重启.如:有一个python的程序:test.py ,通过命令:python test.py来运行程序,但是它会受命令行的中断而中断.所以我们需要一个方法来保证该程序一直在后台运行. 2.解决方法 以前经常用命令:nohup python test.py & 来保证其在后台运行不中断,但是这也不能保证一直运行. 下面介绍用supervisor来管理python的进程,保证其在后台一直运行不中断

Python守护进程(多线程开发)

#!/usr/bin/python import sys,time,json,logging import Queue, threading, datetime from lib.base.daemon import Daemon from lib.queue.httpsqs.HttpsqsClient import HttpsqsClient from lib.db.DbMongodb import DbMongodb logging.basicConfig(level=logging.DEB

Python守护进程命令,为何被黑客钟意?整蛊、木马都用得上它!

考虑一下利用Python制作一个整蛊.木马软件,我提供思路.(清楚到没学过编程的人也理解) 1.首先一个黑客做一个整蛊或者木马软件,一定不会让你能够关闭它. 2.里面经常会附带欺骗的方法. 3.最终实现某种目的. 前段时间在抖音上看到个有趣的程序,看样子是VB写的,首先就要用到欺骗的方法,利用软件名称欺骗目标点击软件,打开后出现一个标签.两个按钮. 标签上写的是:我爱你,你爱我吗? 按钮:一个写爱,一个是不爱. 怎么办?一般人看到这种情况就直接点叉关闭软件了.然而人家程序员早就在代码里写了 窗口

doraemon的python 守护进程和Process

### 9.4 Process模块 进程 ```python from multiprocess import Process p = Process(target=函数名,args=(参数1,)) 1.如何创建一个进程对象 对象和进程之间的关系: a.进程对象和进程并没有直接的关系 b.只是存储了一些和进程相关的内容 c.此时此刻,操作系统还没有接收到创建进程的指令 2.如何开启一个进程 通过p.start()开启一个进程,这个方法相当于给操作系统一个开启指令指令 start方法的异步非阻塞的

Python使用进程池管理进程和进程间通信

与线程池类似的是,如果程序需要启动多个进程,也可以使用进程池来管理进程.程序可以通过 multiprocessing 模块的 Pool() 函数创建进程池,进程池实际上是 multiprocessing.pool.Pool 类. 进程池具有如下常用方法: 1.apply(func[, args[, kwds]]):将 func 函数提交给进程池处理.其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数.该方法会被阻塞直到 func 函数执行完成. 2.app

Python学习进程(13)文件与IO

    本节介绍基本的IO函数和文件的读写操作.     (1)读取键盘输入: Python用于读取键盘输入的函数有两个:raw_input与input. 1)raw_input函数 从标准输入读取一个行,并返回一个字符串(去掉结尾的换行符) >>> str=raw_input("请输入一个字符串") 请输入一个字符串 >>> str=raw_input("请输入一个字符串:\n") 请输入一个字符串: my name is Me