python(十)、进程

一、基本概念

  进程和线程是编程中非常重要的概念,它是并发和异步的基础性知识。因此,这里从网上摘录一些资料以加深理解。

  1.进程

  概念:程序在并发环境中的执行过程。进程作为资源分配和独立运行的基本单位,决定了操作系统的四大特性:并发、异步、共享和虚拟。并发:在同一时段内执行多个任务。异步:执行多个任务时,任务彼此独立互不干扰。共享:系统资源可以供多个任务共享,而不被独占。虚拟:把多个物理设备绑定到一起,建立多个逻辑对应上的应用。

  基本特征:动态性、并发性、调度性。动态性是指,进程是执行着的程序,是程序的运行状态和运行实体,进程随任务的创建而开始,随任务的结束而消失;而程序则是有序指令集合,能够作为一种资源进行存储,程序可以有多个进程。并发性是指,进程是可以并发执行的,系统中的多进程可以独立、不可预知地执行。调度性是指,进程是系统进行资源分配的最小单位。

  程序并发执行时的特征:间断性、失去封闭性、不可再现性。

  基本状态:新建状态、就绪状态、运行状态、阻塞状态、终止状态。它的转换关系如下图所示。就绪、运行和阻塞是常见的三种状态。处于阻塞状态时,如果程序可以继续运行,则需要转为就行状态,再进入运行状态。

  进程调度的队列图:

  PCB(Processing Control Block):进程控制块,通常是系统内存占用区中的一个连续存区(数据结构),主要表示进程状态。它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息。操作系统是根据PCB来对并发执行的进程进行控制和管理的。PCB和进程的关系是一对一的。进程控制块主要包括该进程的以下信息:进程名、特征信息、进程状态信息、调度优先权、通信信息、现场保护区、资源需求、进程实体信息、族系关系等。每个进程有唯一的PCB,PCB是进程存在的唯一标志。

  进程队列:系统把所有PCB组织起来的方式大致分三种:线性方式、链式方式、索引方式。

  进程创建与终止:进程创建原语:创建进程的主要任务是为其建立一个PCB(程序、数据、用户栈的进程映像模型),主要包含四个步骤:申请一个空闲的PCB;为新进程分配资源;初始化PCB;将新PCB插入进程队列。进程终止原语也分四个步骤:找到进程对应的PCB,终止该进程的运行;回收该进程所占用的全部资源;终止其所有子孙进程,回收它们占用的所有资源;将PCB从原来队列中清除。

  进程间关系:互斥-竞争关系、同步-协作关系、通信-信息交流。互斥:逻辑上相互独立的进程由于竞争同一个资源而产生的相互竞争关系;同步:进程间共同完成一项任务时发生的相互作用的关系。互斥是一种同步。通信-进程间的数据交换。

  进程互斥:

    - 一次仅允许一个进程使用的资源称为临界资源(Critical Resource)。临界资源可以是硬件也可以是软件。如打印机只能同时由一个进程使用,那么它就是临界资源。

    - 在每个进程中访问临界资源的那段程序称为临界区(Critical Section)。对于不同临界资源的临界区,它们之间不互斥。

    - 进程互斥,就是对进程进入时临界区加以控制。进入前要申请,获准后进入,执行临界区程序后退出, 然后才可以执行其它代码。

    - 临界区进入准则

      - 空闲让进:如果若干进程要求进入空闲的临界区,一次仅允许一个进程进入。

      - 忙则等待:任何时候,处于临界区的进程不可多于一个。

      - 有限等待:进入临界区的进程要在有限时间内退出。

      - 让权等待:如果进程不能进入自己的临界区,则应让出CPU,避免进程出现“忙等”现象。

  信号量:信号量是一种数据操作锁,与进程互斥相反,信号量规定多个进程可以同时操作同一资源。

二、python进程

  根据multiprocessing的官方文档介绍,multiprocessing is a package that supports spawning processes using an API similar to the threading module,多进程是根据多线程仿写的,它没有GIL(Global Interpreter Lock)的限制,可以充分利用多核资源。

  1.开启进程

  开启进程有两种方式:函数式和类式。

from multiprocessing import Process
import os, time

def say():
    time.sleep(2)
    print("子进程: ", os.getpid())
    print("This is a subprocess.")
    print("子进程结束.")

if __name__ == ‘__main__‘:
    print("主进程: ", os.getpid())
    p = Process(target=say, args=())
    p.start()
    print("主进程结束.")
from multiprocessing import Process
import os, time

class Say(Process):
    def __init__(self):
        super().__init__()
    def run(self):  # 重写类方法
        time.sleep(2)
        print("子进程: ", os.getpid())
        print("This is a subprocess.")
        print("子进程结束.")

if __name__ == ‘__main__‘:
    print("主进程: ", os.getpid())
    p = Say()
    p.start()
    print("主进程结束.")

  当类继承定义开启子类时,run()方法最终调用target方法,并将args和kwargs传递进去,从而开启一个线程。

  multiprocessing supports three ways to start a process。spawnforkforkserver。spawn只会使用部分父进程的资源(重开python解释器进程,速度慢),fork会继承父进程的全部资源(容易泄密),相较于前两个,forkserver会在需要启动子进程时由父进程向forserver请求创建新的子进程,它效率高并且不必继承所有资源。

import multiprocessing as mp

def foo(q):
    q.put(‘hello‘)

if __name__ == ‘__main__‘:
    ctx = mp.get_context(‘forkserver‘)
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()  # 守护进程

  2.进程的几种类型

  守护进程setDaemon(True):父进程执行结束,子进程立刻结束(被强制终止),那么该子进程被称为守护进程。

  孤儿进程(不做任何设置):父进程结束,子进程仍在继续执行,那么子进程被称为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。孤儿进程没有危害。

  阻塞进程(join):父进程在所有子进程执行结束后,再执行结束。

  僵尸进程:使用os.folk()创建的子进程,在其父进程退出时,如果没有调用wait或waitpid获取子进程的状态信息,那么子进程在退出后仍会将文件描述符遗留在系统内存里。这时它不能被init进程回收,会占用系统内存。僵尸进程需要避免。

  3.进程通信

  multiprocessing supports two types of communication channel between processes: Queue and Pipe。进程间通信的方式有两种,即通过队列或者管道。

  管道是进程间直接通信,队列则借助共享存储器空间来实现。远程通信则通过socket实现。

  multiprocessing.Queue是仿写的queue.Queue。它是线程和进程安全的。

  举个例子: 开启两个进程一个队列,一个进程不断地往队列中添加url,另一个进程从队列中取出url,下载天龙八部章节。

from multiprocessing import Process, Queue
import urllib
from bs4 import BeautifulSoup

def url_maker(q):
    num = 2024334
    for i in range(5):
        url = "https://www.ybdu.com/xiaoshuo/10/10237/{}.html".format(num)
        q.put(url)
        print(url)
        num += 1
    q.put("over")

def url_parse(q):
    count = 0
    while True:
        url = q.get()  # 从列表中获取url
        if url == "over":
            break
        else:
            count += 1
            html = urllib.request.urlopen(url)  # 请求html
            html_bytes = html.read() # 读取字节数据
            soup = BeautifulSoup(html_bytes, "html.parser")
            string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text()  # 获取小说内容
            lines = string.split()
            with open("天龙八部.txt", mode="a", encoding="utf-8") as f:  # 写入文件
                f.write("第 {} 章".format(count) + "\r\n")
                print("已下载第{}章...".format(count))
                for i, line in enumerate(lines[: -6]):
                    f.write("   " + line + "\r\n")

if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=url_maker, args=(q,))
    p2 = Process(target=url_parse, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

  改写成子类式:

from multiprocessing import Process, Queue
import urllib
from bs4 import BeautifulSoup

class UrlMaker(Process):   # 传参的时候也可以global,甚至target都可以另写
    def __init__(self):
        super().__init__()
    def run(self):
        num = 2024334
        for i in range(5):
            url = "https://www.ybdu.com/xiaoshuo/10/10237/{}.html".format(num)
            q.put(url)
            print(url)
            num += 1
        q.put("over")

class UrlParser(Process):
    def __init__(self, file):
        super().__init__()
        self.file = file
    def run(self):
        count = 0
        while True:
            url = q.get()  # 从列表中获取url
            if url == "over":
                break
            else:
                count += 1
                html = urllib.request.urlopen(url)  # 请求html
                html_bytes = html.read()  # 读取字节数据
                soup = BeautifulSoup(html_bytes, "html.parser")
                string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text()  # 获取小说内容
                lines = string.split()
                with open(self.file, mode="a", encoding="utf-8") as f:  # 写入文件
                    f.write("第 {} 章".format(count) + "\r\n")
                    print("已下载第{}章...".format(count))
                    for i, line in enumerate(lines[: -6]):
                        f.write("   " + line + "\r\n")

if __name__ == ‘__main__‘:
    q = Queue()
    p1 = UrlMaker()
    p2 = UrlParser("天龙八部1.txt")
    p1.start()
    p2.start()
    p1.join()
    p2.join()

  官网给定的Pipe的例子:  貌似这货不能用while True循环以及I/O操作。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello‘])
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()   # Pipe性能高于Queue
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, ‘hello‘]"
    p.join()

  再看官网给的详细介绍:

  When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

  (多进程一般会使用消息传递进行彼此间通信,并且避免使用像锁这样的同步原语),进程互斥有自己的原语(取得锁 -- do something -- 释放锁)。

  For passing messages one can use Pipe() (for a connection between two processes) or a queue (which allows multiple producers and consumers).

  (可以使用Pipe在一对一进程间传递消息,使用queue多对多进程间传递消息)

  The QueueSimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library.

  (基于标准库中的queue.Queue类,Queue, SimpleQueue和JoinableQueue都是多对多的先进先出的队列模型)

  If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

  JoinableQueue 队列仅有两个方法:task_done()和join(),以生产者消费者模型说明这个队列的用法。

from multiprocessing import Process, JoinableQueue
import os, time
import numpy as np

def func1(q):
    while True:
        num = np.random.randint(1, 4)
        for i in range(num):
            q.get("包子")
            q.task_done()  # 每一次取数据是一个task,都需要调用task_done()回调给队列,告诉队列进程已获取到该值
        print("消费者%s消费了 --%d-- 个包子." % (os.getpid(), num))
        time.sleep(1)

def func2(q):
    num = np.random.randint(5, 15)
    for i in range(num):
        q.put("包子")
    print("生产者%s生产了  --%d--  个包子." % (os.getpid(), num))
    time.sleep(2)
    q.join()       # 如果写q.join(),会使生产者进程一直阻塞到队列中所有的数据都被取走位置;不写的话生产完就结束了

if __name__ == ‘__main__‘:
    q = JoinableQueue()
    processes = []
    for i in range(3):
        p = Process(target=func2, args=(q,), name="生产者")
        processes.append(p)
    for i in range(5):
        p = Process(target=func1, args=(q,), name="消费者")
        processes.append(p)
    for p in processes:
        p.start()
        p.join()

  Note that one can also create a shared queue by using a manager object – see Managers.

  Managers实例化时创建了一个服务进程,以供其它两个或多个进程间的通信。 manager returned by Manager() will support types list, dictNamespaceLockRLockSemaphore,

BoundedSemaphoreConditionEventBarrierQueueValue and Array等。

from multiprocessing import Process, Manager
import time

def func1(dic):
    for i in range(5):
        dic.get("包子")
        print("消费者正在消费第%d个包子." % i)
        time.sleep(1)

def func2(dic):
    for i in range(5):
        dic[i] = "包子 %d " % i
    print("--生产者生产了5个包子.--")

if __name__ == ‘__main__‘:
    with Manager() as manager:
        dic = manager.dict()
        p1 = Process(target=func2, args=(dic,), name="生产者")
        p2 = Process(target=func1, args=(dic,), name="消费者")
        p1.start()
        p2.start()
        p1.join()
        p2.join()

  3.进程同步(Lock、RLock、Condition、Sepmhore)

  先看一段代码:

from multiprocessing import Process
def desc():
    global num
    f = open("test.txt", )
    for i in range(1000):
        # print("desc-----: ", num)
        num -= 1
    print("----------num------------", num)
def add():
    global num
    for i in range(1000):
        print("add: ", num)
        num += 1
    print("----------num------------", num)
if __name__ == ‘__main__‘:
    num = 0
    p1 = Process(target=desc)
    p1.start()
    p1.join()
    print(num)

# 打印结果为:
----------num------------ -1000
0

  可以看出: 在创建子进程时,将global资源给了子进程,然后两个进程就彼此独立了。主进程有个num,子进程有个num,各自计算num值没有交互。印证了进程间的独立性。另一方面,也说明了进程间通信必须依赖第三方内存(硬盘)空间(Queue、Pipe、Manager等)。

  同样地,下面代码最后打印了三个进程各自的num值,并且从循环打印中可以看出,除了创建进程时的复制资源,进程运行时各自的循环没有交互。

from multiprocessing import Process

def desc():
    global num
    for i in range(10):
        print("desc-----: ", num)
        num -= 1
    print("----------num------------", num)

def add():
    global num
    for i in range(10):
        print("add: ", num)
        num += 1
    print("----------num------------", num)

if __name__ == ‘__main__‘:
    num = 0
    p1 = Process(target=desc)
    p2 = Process(target=add)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)

  进程同步,实质上是让彼此独立的进程被顺序执行,即由并发转成串行。假如要顺序的写一个文件,那么可以用互斥锁的方式实现(实现了串行),当然它也可以一个进程实现。

from multiprocessing import Process, Lock

lock = Lock()     # lock是全局变量,它对每个创建的进程进行管理
class Write1(Process):
    def __init__(self):
        super().__init__()   # 这里可以self.lock = lock,但是不能用self.lock = Lock()
    def run(self):
        f = open("test.txt", mode="a", encoding="utf8")
        lock.acquire()
        for i in range(1000):
            f.write("1111\r\n")
        lock.release()
        f.close()
class Write2(Process):
    def __init__(self):
        super().__init__()
    def run(self):
        f = open("test.txt", mode="a", encoding="utf8")
        lock.acquire()
        for i in range(1000):
            f.write("22222222222222222\r\n")
        lock.release()
        f.close()
if __name__ == ‘__main__‘:
    t1 = Write1()
    t2 = Write2()
    t1.start()
    t2.start()
    t1.join()
    t2.join()

  Condition是建立在RLock和Lock之上的进程同步类。它使得进程同步看起来扩展性更好一些。

from multiprocessing import Process, Condition
import time

cond = Condition()
class P1(Process):
    def __init__(self, cond):
        super().__init__()
        self.cond = cond
        self.name = "孙先生"
    def run(self):
        with self.cond:
            print("{}: 哟,美女呦。".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{}: 红了好像更好呀。".format(self.name))

class P2(Process):
    def __init__(self, cond):
        super().__init__()
        self.cond = cond
        self.name = "段小姐"
    def run(self):
        with self.cond:
            self.cond.wait()
            print("{}: 呦,哟,哟,脸都红了啦。".format(self.name))
            time.sleep(5)
            self.cond.notify()

            self.cond.wait()
            print("{}: 讨厌啦。".format(self.name))
            self.cond.notify()

if __name__ == ‘__main__‘:
    p1 = P1(cond)
    p2 = P2(cond)
    print("p1和p2开始处理同一个事情")
    p2.start()
    p1.start()

  4.进程池

  进程池创建耗费CPU时间,并且为了维持系统稳定性,就有了进程池。进程池维持固定数量的进程数,当有任务时,进程池来调配进程处理task。它支持函数式数据的并行(串行)处理,也可以调配阻塞和非阻塞模式。所谓阻塞,即用户态向内核态发送处理请求没有立刻得到相应。所谓非阻塞,即不管内核态有没有处理完用户态请求,都要在请求超时之前返回一个状态码给用户态,用户态得到状态码可以继续执行其它任务,当内核态处理完数据时,会激活用户态事先传过来的回调函数,从而能够通知用户态超时任务已经处理完毕。用户态就可以继续执行之前的超时任务了。

  multiprocess.Pool模块使用一个队列和一组线程来管理进程池。

  同样以爬取天龙八部章节为例。

from multiprocessing import Process, Queue, Pool
import urllib
from bs4 import BeautifulSoup

class UrlMaker(Process):   # 传参的时候也可以global,甚至target都可以另写
    def __init__(self, number):
        super().__init__()
        self.number = number
    def run(self):
        num = 2024334
        for i in range(self.number):
            url = "https://www.ybdu.com/xiaoshuo/10/10237/{}.html".format(num)
            q.put(url)
            print(url)
            num += 1
        q.put("over")

def urlParser(file):
    url = q.get()  # 从列表中获取url
    if url == "over":
        return {
            "code": False,
        }
    else:
        html = urllib.request.urlopen(url)  # 请求html
        html_bytes = html.read()  # 读取字节数据
        soup = BeautifulSoup(html_bytes, "html.parser")
        title = soup.find("div", attrs={"class": "h1title"}).h1.get_text()
        string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text()  # 获取小说内容
        lines = string.split()
        with open(file, mode="a", encoding="utf-8") as f:  # 写入文件
            f.write(title + "\r\n")
            for i, line in enumerate(lines[: -6]):
                f.write("   " + line + "\r\n")
    return {
        "code": True,
        "url": url,
        "title": title,
    }

def callback(msg):
    if msg["code"]:
        print("Process handled url: {}, title: {}.".format(msg["url"], msg["title"]))
    else:
        print("All urls had parsed.")

if __name__ == ‘__main__‘:
    q = Queue()            # 创建一个队列,用于存放要请求的url
    pool = Pool(3)         # 创建一个进程池,用于取出url并解析和下载正文内容
    p1 = UrlMaker(10)      # 创建一个进程,用于向url队列中添加url
    for i in range(20):
        pool.apply_async(func=urlParser, args=("天龙八部1.txt",), callback=callback)    # pool.apply_async: 异步非阻塞模式, 如果用pool.apply,则是阻塞模式(等价于pool.map)
      # func支持函数,和args一起会创建一个进程    # callback参数是回调函数,回调函数的参数就是func执行的结果    # 它本身是一个ApplyResult对象,即 result=pool.apply_async(func, args, callback);result有自己的一些状态码    # result.get():返回结果,如果有必要则等待结果到达。    # result.ready():如果调用完成,返回True。    # result.successful():如果调用完成且没有引发异常,返回True。    # result.wait([timeout]):等待结果变为可用。     # result.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。  p1.start()
    p1.join()
    pool.close()     # 等待进程执行结束时关闭进程
    pool.join()      # 进程池对象pool常用的方法: apply_async、apply、close、join

  

  

原文地址:https://www.cnblogs.com/kuaizifeng/p/9173877.html

时间: 2024-10-12 20:26:06

python(十)、进程的相关文章

Python(十) 进程、线程、协程篇

本节内容 1.操作系统发展史    2.进程.与线程区别    3.Python GIL全局解释器锁    4.线程        1.语法        2.join        3.线程锁 Lock.RLock.信号量        4.将线程变为守护进程        5.Event事件        6.queue队列        7.生产者消费者模型        8.Queue队列        9.开发一个线程池    5.进程        1.语法        2.进程间

Python 3 进程池与回调函数

Python 3 进程池与回调函数 一.进程池 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间.多进程是实现并发的手段之一,需要注意的问题是: 很明显需要并发执行的任务通常要远大于核数 一个操作系统不可能无限开启进程,通常有几个核就开几个进程 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行) 例如当被操作对象数目不大时,可以直接利用multiprocessing中的Proces

Python实例浅谈之五Python守护进程和脚本单例运行

一.简介 守护进程最重要的特性是后台运行:它必须与其运行前的环境隔离开来,这些环境包括未关闭的文件描述符.控制终端.会话和进程组.工作目录以及文件创建掩码等:它可以在系统启动时从启动脚本/etc/rc.d中启动,可以由inetd守护进程启动,也可以有作业规划进程crond启动,还可以由用户终端(通常是shell)执行. Python有时需要保证只运行一个脚本实例,以避免数据的冲突. 二.Python守护进程 1.函数实现 #!/usr/bin/env python #coding: utf-8

#python#守护进程的实现

找了整天,终于找到一个可以用的代码 #! /usr/bin/env python2.7 #encoding:utf-8 #@description:一个python守护进程的例子 #@tags:python,daemon import sys import os import time import atexit from signal import SIGTERM      class Daemon:   """   A generic daemon class.     

python 守护进程

daemon进程 守护进程 1.fork子进程,而后父进程退出,此时子进程会被init进程接管 2.修改子进程的工作目录,创建新进程组和新会话,修改umask 3.子进程再次fork一个进程,这个进程可以称为孙子进程,而后子进程退出 4.重定向孙子进程标准输入流,标准输出等 atexit程序退出 调用某个函数 kill  级别 python 守护进程,布布扣,bubuko.com

【Python】使用Supervisor来管理Python的进程

1.问题描述 需要一个python的服务程序在后台一直运行,不能让该进程被杀死,即使被杀死也要能及时自动重启.如:有一个python的程序:test.py ,通过命令:python test.py来运行程序,但是它会受命令行的中断而中断.所以我们需要一个方法来保证该程序一直在后台运行. 2.解决方法 以前经常用命令:nohup python test.py & 来保证其在后台运行不中断,但是这也不能保证一直运行. 下面介绍用supervisor来管理python的进程,保证其在后台一直运行不中断

selenium-webdriver(python) (十四) -- webdriver原理(转载虫师自动化)

selenium-webdriver(python) (十四) -- webdriver原理 2013-08-22 12:55 by 虫师, 13926 阅读, 12 评论, 收藏, 编辑 之前看乙醇视频中提到,selenium 的ruby 实现有一个小后门,在代码中加上$DEBUG=1 ,再运行脚本的过程中,就可以看到客户端请求的信息与服务器端返回的数据:觉得这个功能很强大,可以帮助理解webdriver的运行原理. 后来查了半天,python并没有提供这样一个方便的后门,不过我们可以通过代理

Python守护进程(多线程开发)

#!/usr/bin/python import sys,time,json,logging import Queue, threading, datetime from lib.base.daemon import Daemon from lib.queue.httpsqs.HttpsqsClient import HttpsqsClient from lib.db.DbMongodb import DbMongodb logging.basicConfig(level=logging.DEB

Python守护进程命令,为何被黑客钟意?整蛊、木马都用得上它!

考虑一下利用Python制作一个整蛊.木马软件,我提供思路.(清楚到没学过编程的人也理解) 1.首先一个黑客做一个整蛊或者木马软件,一定不会让你能够关闭它. 2.里面经常会附带欺骗的方法. 3.最终实现某种目的. 前段时间在抖音上看到个有趣的程序,看样子是VB写的,首先就要用到欺骗的方法,利用软件名称欺骗目标点击软件,打开后出现一个标签.两个按钮. 标签上写的是:我爱你,你爱我吗? 按钮:一个写爱,一个是不爱. 怎么办?一般人看到这种情况就直接点叉关闭软件了.然而人家程序员早就在代码里写了 窗口