day11 队列、线程、进程、协程及Python使用缓存(redis/memcache)

上篇博客简单介绍了多进程和多线程分别是什么,及分别使用于那种场景。

这里再稍微聊聊线程和进程相关的东西以及协程

一、队列

import queue
import threading

# queue.Queue,先进先出队列
# queue.LifoQueue,后进先出队列
# queue.PriorityQueue,优先级队列
# queue.deque,双向对队

# queue.Queue(2) 先进先出队列
# put放数据,是否阻塞,阻塞时的超时时间
# get取数据(默认阻塞),是否阻塞,阻塞时的超时时间
# qsize()真实个数
# maxsize 最大支持的个数
# join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞

q = queue.Queue(2)   #先进先出队列(括号中的参数为maxsize,代表队列中最多能有几个值,默认为0,代表可以有无限个值)
print(q.qsize())      #查看目前队列中有几个值
q.put(11)      #put方法为给队列中放一个值
q.put(22)
q.put(block=False,timeout=2)     #block代表是否阻塞,默认为阻塞,为非阻塞时,当进入队列需要等待时直接报错。timeout代表最多等待时间,超过等待时间直接报错
print(q.get())   #get方法为取一个值
q.task_done()     #task_done方法为告诉队列我取完值了
print(q.get())
q.task_done()

q.join()     #阻塞进程,当队列中任务执行完时,不再阻塞
print(q.empty())   #若当前队列中有任务,返回False,否则返回True

#生产者消费者模型
#解决并发问题

q = queue.Queue(20)

def productor(arg):
    q.put(‘包子‘)

def customer(arg):
    q.get()

for i in range(10):
    t1 = threading.Thread(target=productor,args=(i,))
    t1.start()

    二、及其及其简易版线程池

#!/usr/bin/env python
# _*_ coding:utf-8_*_

import threading
import queue
import time

class threadpool:

    def __init__(self,maxsize=5):
        self.maxsize = maxsize    #默认最大线程为5
        self._q = queue.Queue(maxsize)     #创建队列,并设置队列中最多可以有5个值,既最大线程数为5
        for i in range(maxsize):
            self._q.put(threading.Thread)   #创建线程对象,将其放入到队列中
            #self._q为[threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread]
    def get_thread(self):
        """
        获取线程函数
        :return:    队列中实际存放的为一个一个的线程对象,故return的一个线程对象,即threading.Thread
        """
        return self._q.get()

    def add_thread(self):
        self._q.put(threading.Thread)    #线程池中要保持有5个线程,当使用掉一个线程时,再重新put到队列中一个线程对象

pool = threadpool()

def task(arg):
    print(arg)
    time.sleep(1)
    pool.add_thread()

for i in range(100):
    t = pool.get_thread()     #这里t为threading.Thread
    obj = t(target=task, args=(i,))
    obj.start()

三、线程锁

import threading
import time

#放行单个线程
NUM = 10
def func(l):
    l.acquire()     #上锁
    global NUM
    NUM -= 1
    time.sleep(1)
    print(NUM)
    l.release()   #解锁

lock = threading.RLock()    #创建线程锁    #threading.LOCK 和 threading.RLOCK的区别:RLOCK支持多层锁,LOCK不支持,所以通常情况下使用RLOCK即可

for i in range(10):
    t1 = threading.Thread(target=func,args=(lock,))
    t1.start()

#放行指定个数的线程------信号量
NUM = 20
def func(l):
    l.acquire()     #上锁   ---  一次放行五个线程
    global NUM
    NUM -= 1
    time.sleep(1)
    print(NUM)
    l.release()   #解锁

lock = threading.BoundedSemaphore(5)   #5代表依次放行几个线程

for i in range(20):
    t1 = threading.Thread(target=func,args=(lock,))
    t1.start()   #t1.start()仅代表线程创建完毕,等待CPU来调度执行,但是CPU调度是随机的,so.. 输出的结果没按照递减的顺序输出

#放行全部线程
NUM = 10
def func(e):
    global NUM
    NUM -= 1
    e.wait()    #检测是什么灯,(默认即为红灯),红灯停,绿灯行
    print(NUM)
    print(NUM+100)

event = threading.Event()    #事务,可比喻为红绿灯,红灯全部锁住,绿灯全部放行

for i in range(10):
    t1 = threading.Thread(target=func,args=(event,))
    t1.start()

event.clear()    #设置为红灯(默认即为红灯)

inp = input(">>: ")
if inp == ‘1‘:
    event.set()    #设置为绿灯

#条件锁 condition
#当某个条件成立,放行指定个数的锁
def func(i,conn):
    print(i)
    conn.acquire()
    conn.wait()
    print(i+100)
    conn.release()

c = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func,args=(i,c))
    t.start()

while True:
    inp = input(">>: ")
    if inp == ‘q‘:
        break
    c.acquire()
    c.notify(int(inp))       #这三行为固定用法,notify的作用为告诉func里的函数acquire放行几个锁
    c.release()

#条件锁的另外一种方式
def condition():
    ret = False
    r = input(">>: ")
    if r == ‘true‘:
        ret = True
    return ret

def func(i,conn):
    print(i)
    conn.acquire()
    conn.wait_for(condition)    #condithon会return两种结果,一种为True,一种为False,返回值为True时,放行一个锁,为False时,不放行
    print(i+100)
    conn.release()

c = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()

#Timer
from threading import Timer

def hello():
    print(‘hello world‘)

t = Timer(1,hello)
t.start()    #一秒后,执行hello函数

四、进程池

from multiprocessing import Pool
import time

def foo(arg):
    time.sleep(0.1)
    print(arg)

if __name__ == ‘__main__‘:
    pool = Pool(5)

    for i in range(20):
        pool.apply_async(func=foo,args=(i,))

    pool.close()    #所有任务执行完毕后关闭进程池
    time.sleep(0.2)
    pool.terminate()   #立即终止正在执行的任务,关闭进程池
    pool.join()
    print(‘end‘)

五、实现进程间数据共享

"""
默认情况下进程之间数据是不共享的
这里分别使用queues,Array,Manager来实现进程间数据共享
"""

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
    arg.put(i)
    print(‘say hi‘,i,arg.qsize())

if __name__ == "__main__":
    li = queues.Queue(20,ctx=multiprocessing)   #类似queue队列
    for i in range(10):
        p = Process(target=foo,args=(i,li))
        p.start()
        # p.join()

from multiprocessing import Process
from  multiprocessing import Array
#Array 数组,类似列表,但必须在创建时指定数组中存放的数据类型及数据的个数

def foo(i,arg):
    arg[i] = i + 100
    for item in arg:
        print(item)
    print(‘===============‘)

if __name__ == ‘__main__‘:
    li = Array(‘i‘,10)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        # p.join()

from multiprocessing import Process
from  multiprocessing import Manager

def foo(i,arg):
    arg[i] = i + 100
    print(arg.values())

if __name__ == ‘__main__‘:
    obj = Manager()
    li = obj.dict()    #Manager.dict相当于Python中的字典
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()    #li属于主进程,p是子进程,主进程与子进程之间交互时会创建一个连接,这里不使用join时会报错,因为主进程执行完后主进程与子进程之间的连接就会断开,so,这里要让主进程等待子进程执行完毕
时间: 2025-01-02 15:38:21

day11 队列、线程、进程、协程及Python使用缓存(redis/memcache)的相关文章

python全栈开发 * 线程队列 线程池 协程 * 180731

一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.LifoQueue后进先出 后进先出 自带锁 数据安全 from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.pu

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

Python的线程&进程&协程[0] -> 基本概念

基本概念 / Basic Concept 0 简介与动机 / Why Multi-Thread/Multi-Process/Coroutine 在多线程(multithreaded, MT)编程出现之前,计算机程序的执行是由单个步骤序列组成的,该序列在主机的CPU中按照同步顺序执行.即无论任务多少,是否包含子任务,都要按照顺序方式进行. 然而,假定子任务之间相互独立,没有因果关系,若能使这些独立的任务同时运行,则这种并行处理方式可以显著提高整个任务的性能,这便是多线程编程. 而对于Python而

关于进程、线程、协程在python中的使用问题

描述 最近在python中开发一个人工智能调度平台,因为计算侧使用python+tensorflow,调度侧为了语言的异构安全性,也选择了python,就涉及到了一个调度并发性能问题,因为业务需要,需要能达到1000+个qps的业务量需求,对python调度服务的性能有很大挑战.具体的架构如下面所示: 补充:架构中使用的python为cpython,解释执行的语言,并非jpython或者pypython,cpython的社区环境比较活跃,很多开发包都是现在cpython下实现的,比如项目中计算模

第九天 线程 进程 协程 队列

详细链接http://www.cnblogs.com/alex3714/articles/5230609.html 1.线程:包含在进程中,是操作系统运算调度的最小单位,是一串指令的集合,直接与cpu交互 2进程:进程是一个程序各种资源的集合.操作系统通过管理这个集合进而运行程序,进程本身并不执行,进程通过调用线程来调度cpu. 3.不同点: 一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程 创建新线程很简单,但是创建一个子进程需要对父进程进行拷贝 线程共享内存,进程的内存是独

Python的线程&进程&协程[0] -> 线程 -> 多线程的建立与使用

常用的多线程功能实现 目录 生成线程的三种方法 单线程与多线程对比 守护线程的设置 1 生成线程的三种方法 三种方式分别为: 创建一个Thread实例,传给它一个函数 创建一个Thread实例,传给它一个可调用的类实例 派生Thread的子类,并创建子类的实例 # There are three ways to create a thread # The first is create a thread instance, and pass a function # The second one

线程队列 线程池 协程

1 . 线程队列 from multiprocessing Queue , JoinableQueue  #进程IPC队列 from queue import Queue  #线程队列  先进先出 from queue import LifoQueue  #后进先出的 方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize 队列 Queue : 先进先出 , 自带锁 , 数据安全 栈 LifoQueue : 后进先

11 线程进程协程

1 线程 1.1 基本应用 1.1.1 标准线程(常用) import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() 1.1.2 自定义线程 自定义线程类既threading.Thread流程,自定义run方法 import threading class MyThread(threading.Thread): #自定义类,继承threading.Thread类 d

线程 进程 协程

一.什么是线程? 线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元).它被包含在进程之中,是进程中的实际运作单位.一条线程指的是一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务. 一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成.另外,线程是进程的中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源.一个线程可以创建和撤销另一