自动化运维Python系列之进程、线程、协程

进程 线程 协程

1)进程是具有一定独立功能的的程序关于某个数据集合上的一次运行活动,是系统进行资源分配和调度的独立单位

2)线程是进程的一个实体,是CPU调度和分派的基本单位

3)协程是程序自身产生的一种线程复用机制,作用是让一个线程重复利用,减少系统资源开销,提高程序效率

由于进程、线程都是操作系统的基本概念,比较抽象,我们可以将CPU看作是一个时刻在运行中的大型工厂,车间就是工厂里具有独立工作能力的程序进程,每个车间里工作的机器人就是线程:

系统工作模式:

同一时间工厂只能为一个车间供电,供电期间CPU调度线程,完成他们自己的工作,一旦供电时间到,即使有线程工作未完成,也会立即停止,各线程会保存当前工作的进度(系统的上下文切换),等待下一次供电时间,继续完成上一次工作;其实我们系统里同一时刻确实只能运行一个程序进程,只是由于CPU在各个程序之间切换的速度够快,我们感知不到,所有表面上看所有程序都是同时运行的。

Python线程

import threading
import time
 
def show(arg):
    time.sleep(1)
    print(‘thread‘ + str(arg))
 
# 创建10个线程 start表示创建成功 等待CPU调度运行
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()
 
print(‘main thread stop‘)
 
# 输出:
main thread stop
thread3
thread2
thread1
thread0
...
 
# Thread      线程的其他方法
# start      线程准备就绪,等待CPU调度
# setName    为线程设置名称
# getName    获取线程名称
# setDaemon  设置为后台线程或前台线程(默认)
#            如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
#            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
# join      逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
# run       线程被cpu调度后自动执行线程对象的run方法

自定义创建线程类

# 线程被创建准备好后 被cpu调度会去执行threading.Thread里面的 run 方法
 
class MyThread(threading.Thread):
 
    def __init__(self, func, args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()
 
    def run(self):
        self.func(self.args)
 
def f2(arg):
    print(arg)
 
obj = MyThread(f2, 123)
obj.start()

线程锁

由于CPU调用线程是随机性的,也就是线程被执行顺序不固定,在创建了很多线程同时去执行修改同一数据时就会发生操作顺序混乱,导致出现脏数据

import threading
import time
 
NUM = 10
 
def func():
    global NUM
    NUM -= 1
    # 程序等待1秒钟 10个线程基本都执行到此处
    time.sleep(1)
    # 此时打印出来的 NUM 值就是 0 了
    print(NUM)
  
for i in range(10):
    t = threading.Thread(target=func,)
    t.start()
 
# 输出
0
0
0
...

加线程锁以后

import threading
import time
 
NUM = 10
 
def func(l):
    global NUM
    l.acquire()
    NUM -= 1
    time.sleep(1)
    print(NUM)
    l.release()
 
# RLock 可以加多层线程锁
# lock = threading.Lock()
lock = threading.RLock()
 
for i in range(10):
    t = threading.Thread(target=func, args=(lock, ))
    t.start()
 
# 输出
9
8
7
6
...

信号量(Semaphore)

线程锁同时只允许一个线程更改数据,而semaphore可以允许同时一定数量的线程通过,后面的线程将等待

import threading, time
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)
    semaphore.release()
 
if __name__ == ‘__main__‘:
    num = 0
    # 最多允许5个线程同时运行
    semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()

事件(event)

Python线程的事件用于主线程控制子线程的执行,事件主要提供3中方法:set、wait、clear

有点像红绿灯:clear红灯停 set绿灯行 wait黄灯等

import threading
 
def func(i, e):
    print(i)
    # 出现wait后 所有线程将在此阻塞 直到用户输入1设置event.set放行
    e.wait()
    print(i + 100)
 
event = threading.Event()
 
for i in range(3):
    t = threading.Thread(target=func, args=(i, event,))
    t.start()
# 默认为设置为clear阻塞
event.clear()
 
inp = input(‘>>>‘)
# 手动放行
if inp == ‘1‘:
    event.set()
 
# 输出:
1
2
3
>>> 1
100
200
300

Condition条件

condition使得在等待的线程 满足一定条件才会被放行设定数量的线程

import threading
 
def func(i, con):
    print(i)
    con.acquire()
    con.wait()
    print(i+100)
    con.release()
 
c = threading.Condition()
 
for i in range(10):
    t = threading.Thread(target=func, args=(i, c, ))
    t.start()
 
while True:
    inp = input(‘>>>‘)
    if inp == ‘q‘:
        break
    c.acquire()
    c.notify(int(inp))
    c.release()

Timer

指定n秒后 执行某操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()

Python自定义线程池

Python中在3之前没有线程池 3中已有的线程池也是功能非常少的低级线程池

自定义一个线程池

import queue
import threading
import contextlib
import time
 
# 空任务标识 终止进程
StopEvent = object()
 
class ThreadPool(object):
 
    def __init__(self, max_num, max_task_num=None):
        if max_task_num:
            # 装任务的队列q
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        # 允许的最大大线程数
        self.max_num = max_num
        # 终止线程
        self.cancel = False
        self.terminal = False
        # 当前已创建的线程数量
        self.generate_list = []
        # 当前空闲的线程数量
        self.free_list = []
 
    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        # 判断已经创建的线程数量是否小于最大线程数 表示所有的线程在忙 多余任务不再创建线程
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        # 任务元组放到队列
        w = (func, args, callback,)
        self.q.put(w)
 
    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()
 
    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        # 将已经创建的线程加入线程列表
        self.generate_list.append(current_thread)
 
        # 取任务元组
        event = self.q.get()
        while event != StopEvent:
            # 得到任务元组的内容
            func, arguments, callback = event
            try:
                # 执行 action 函数
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None
 
            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass
 
            # 将线程标记为空闲
            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    # 如果取到的是空任务执行下面的else
                    event = self.q.get()
        else:
 
            self.generate_list.remove(current_thread)
 
    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        # 创建了多少线程就传几个空任务
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1
 
    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.queue.clear()
 
    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)
 
# How to use
 
pool = ThreadPool(5)
 
def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass
 
def action(i):
    print(i)
 
# 30个任务
for i in range(30):
    ret = pool.run(action, (i,), callback)
 
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
# pool.terminate()

Python进程

创建进程类似于创建线程

不同的是,创建进程需要消耗系统不小的开销 而且各进程间数据不共享

from multiprocessing import Process
import threading
import time
   
def foo(i):
    print ‘say hi‘,i
   
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

进程间数据共享方式

1)queues 队列方式

2)Array 数组方式

3)dict 字典方式

# 方式一 queues
 
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
 
def foo(i, arg):
    arg.put(i)
    print(‘say hi‘, i, arg.qsize())
 
if __name__ == ‘__main__‘:
    li = queues.Queue(20, ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
 
# 方式二 Array数据
 
from multiprocessing import Process
from multiprocessing import Array
 
def foo(i, arg):
    arg[i] = i + 100
    for item in arg:
        print(item)
    print(‘=========‘)
 
if __name__ == ‘__main__‘:
    # 使用数据Array必须指定数据类型 和长度
    li = Array(‘i‘, 10)
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
        p.join()
 
# 方式三 dict
 
from multiprocessing import Process
from multiprocessing import Manager
import time
 
def foo(i, arg):
    arg[i] = 100 + i
    print(arg.values())
 
if __name__ == ‘__main__‘:
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
        # 主进程结束后 子进程会被强行终止
        # 可以使用join等待所有子进程全部执行完成 或者 time.sleep
        p.join()

进程锁

from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
 
def foo(i, lis, lc):
    lc.acquire()
    # 将列表中的数字递减1
    lis[0] = lis[0] - 1
    time.sleep(1)
    print(‘say hi‘, lis[0])
    lc.release()
 
if __name__ == ‘__main__‘:
    li = Array(‘i‘, 1)
    li[0] = 10
    lock = RLock()
    for i in range(10):
        p = Process(target=foo, args=(i, li, lock))
        p.start()

进程池

进程池内部维护一个进程序列 但使用时 这去进程池中获取一个进程,如果进程中没有,则需要等待,知道进程池中有进程为止

import time
from multiprocessing import Pool
 
def f1(arg):
    time.sleep(1)
    print(arg)
 
if __name__ == ‘__main__‘:
    pool = Pool(5)
  
    for i in range(30):
        # pool.apply(func=f1, args=(i, ))
        # 异步执行 一部到位
        pool.apply_async(func=f1, args=(i, ))
 
    # close 表示所有的子进程任务执行完毕
    pool.close()
    # time.sleep(1)
    # terminate 表示立即终止所有子进程
    # pool.terminate()
    pool.join()

协程

进程和协程的操作是系统,而协程的创建和操作是程序的编写者

协程存在的意义:对于多协程的应用,CPU通过切片的方式来切换协程间的执行,线程切换时需要耗时(保存状态,下次继续)协程则只使用一个线程,在一个线程中规定某个代码块执行顺序

协程使用场景:当程序中存在大量不需要使用CPU的操作时,即多IO操作适用协程

greelet

from greenlet import greenlet
 
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
 
def test2():
    print(56)
    gr1.switch()
    print(78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

import gevent
  
def foo():
    print(‘Running in foo‘)
    gevent.sleep(0)
    print(‘Explicit context switch to foo again‘)
  
def bar():
    print(‘Explicit context to bar‘)
    gevent.sleep(0)
    print(‘Implicit context switch back to bar‘)
  
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

通过IO自动切换来完成网站请求

from gevent import monkey; monkey.patch_all()
import gevent
import requests
 
def f(url):
    print(‘GET: %s‘ % url)
    resp = requests.get(url)
    data = resp.text
    print(‘%d bytes received from %s.‘ % (len(data), url))
 
gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])
 
# 输出
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
462078 bytes received from https://www.yahoo.com/.
25689 bytes received from https://github.com/.
47373 bytes received from https://www.python.org/.

队列queue

import queue
 
# queue.Queue 先进先出
# q = queue.LifoQueue() 后进先出
 
# 权重队列
# q = queue.PriorityQueue()
# q.put((1, "alex"))
# print(q.get())
 
# 双向队列
# q = queue.deque()
# q.append(123)
# q.append(456)
# q.appendleft(789)
# q.pop()
# q.popleft()
 
# 先进先出队列
# 参数10表示最多只接收10个数据
# q = queue.Queue(10)
# q.put(11)
# q.put(22)
# 超时时间timeout block是否阻塞
# q.put(33, timeout=2)
# q.put(33, block=False)
 
# print(q.qsize())
# get默认阻塞 有数据才能取
# print(q.get())
# print(q.get(block=False))
 
# join task_done 阻塞进程 当队列中的任务执行完毕后 不再阻塞
q = queue.Queue()
q.put(123)
q.put(456)
q.get()
q.task_done()
q.get()
q.task_done()
 
q.join()
时间: 2024-10-05 01:00:13

自动化运维Python系列之进程、线程、协程的相关文章

自动化运维Python系列(一)之基础篇

Python介绍 Python是由创始人吉多·范罗苏姆(Guido van Rossum)在1989年圣诞节假期期间,为了打发时间,构思出来的一个新的脚本解释器.由于Guido在开发Python语言过程中,借鉴了很多ABC语言特性,所有后来包括Guido自己也那么认为,Python语言的前身就是ABC语言. Python是一门面向对象的.动态解释型强定义语言:Python崇尚简洁.优美.清晰,是一门优秀的被广泛使用的语言. 在2015年以前,最流行的Python版本还是2.4,但是由于Pytho

自动化运维Python系列之ForeignKey、relationship联表查询

一对多和多对多 数据库表结构设计是程序项目开发前的重要环节,后期数据库操作都是围绕着这个已经设计好的表结构进行,如果表结构设计有问题,整个程序项目就有存在需要整个推翻重构的风险... 数据库表结构除了简单的单表操作以外,还有一对多.多对多等. 一对多 基于SQLAlchemy我们可以先创建如下结构的2张表,然后来看看具体怎样通过外键ForeignKey或者relationship联表操作 创建表 from sqlalchemy.ext.declarative import declarative

Python并发编程-进程 线程 协程

一.进程 进程:就是一个程序在一个数据集上的一次动态执行过程. 进程由三部分组成: 1.程序:我们编写的程序用来描述进程要完成哪些功能以及如何完成 2.数据集:数据集则是程序在执行过程中所需要使用的资源 3.进程控制块:进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感 知进程存在的唯一标志. 二.线程                                                                        

自动化运维Python系列(七)之Socket编程

了解知识点TCP\IP 要想理解socket首先得熟悉一下TCP/IP协议族, TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,定义了主机如何连入因特网及数据如何再它们之间传输的标准, 从字面意思来看TCP/IP是TCP和IP协议的合称,但实际上TCP/IP协议是指因特网整个TCP/IP协议族.不同于ISO模型的七个分层,TCP/IP协议参考模型把所有的TCP/IP系列协议归类到四个抽象层中(数据链路层和物理

自动化运维Python系列之IO多路复用、SocketServer源码分析

IO多路复用 IO多路复用是指:通过一种机制,可以监视多个描述符,一旦某个系统描述符就绪(一般是读就绪或者写就绪)能够通知程序进行相应的读写操作 实例化例子就是在SocketServer模块中,客户端和服务端建立好连接,此时服务端通过监听conn这条链路,一旦客户端发送了数据,conn链路状态就发生变化,服务端就知道有数据要接收... Linux系统中同时存在select.pull.epoll三种IO多路复用机制 windows中只有select机制 1)select select本质上是通过设

自动化运维Python系列之Memcache、Redis操作

Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memcached安装 wget http://memcached.org/latest tar -zxvf memcach

自动化运维Python系列(五)之常用模块

模块 用一坨代码实现了某个功能的代码集合 模块分为三种 · 自定义模块 · 第三方模块 · 内置模块 1)自定义模块 自己编写并存在在某个路径下的python程序,实现了某个功能,可以被其他程序调用 2)第三方模块 网络上下载并能兼容当前Python版本的第三方程序模块,比如支持HTTP测试的requests库 3)内置模块 C:\Python3.5\Lib目录下的py文件大部分都是Python的内置模块,如sys.os.time等 导入模块 import module from module.

自动化运维Python系列(四)之装饰器和生成器

装饰器 在理解什么事装饰器之前,我们需要理解:函数也是一个对象,可以赋值给变量,通过变量来调用 def f1():     print('2016') d = f1 d() 输出: 2016 那么装饰器的作用就是在不改变原函数的前提下,调用这些函数,并且为函数增加我们需要的新功能. 我们平时在编写好很多独立函数模块以后,突然需要在每个模块内添加一个功能,比如: def f1():     print('F1') def f2():     print('F2') def f3():     pr

自动化运维Python系列之Django进阶操作

FBV && CBV FBV.CBV是Django视图路由处理模型,当用户请求送达路由系统URL后,由其转发给视图view来分析并处理 // FBV    function base views  // CBV    class base views 区别就是一个直接用函数驱动,一个用类驱动,两者在使用上存在一些区别 1)FBV URL中根据路由匹配直接转发给视图中的某一个处理函数 urlpatterns = [     url(r'^home/', views.home), ] 视图函数