队列、线程、进程、协程


一、线程  

  1、基本使用

创建线程的两种方式:
import threading
def f1(arg):
   print(arg)

t = threading.Thread(target=f1, args=(123,))
t.start()

#以下为执行结果
123


import threading
class MyThread(threading.Thread):
   def __init__(self, func,args):
      self.func = func
      self.args = args
      super(MyThread, self).__init__()

   def run(self):
      self.func(self.args)

def f2(arg):
   print(arg)

obj = MyThread(f2,123)
obj.start()

#以下为执行结果
123
  2、队列

#put get 存取数据
import queue
q = queue.Queue() #创建队列对象,该对象为先进先出对象
q.put(123)
q.put(234)
q.put(345)
print(q.get())
print(q.get())
print(q.get())
#输出:
123
234
345

#判断 阻塞

import  queue
q=queue.Queue()
print(q.empty())  #队列是否为空
print(q.full())  #队列是否已满
#q=queue.Queue(10)   #队列最大长度10
q.put(11)
q.put(22)
print(q.qsize()) #队列现在有几个元素
# q.put(33,block=False)    #不阻塞,直接抛出异常
# q.put(33,timeout=2)  #存数据阻塞 ,超时时间 2秒后抛出异常
print(q.get())
print(q.get())
print(q.get(timeout=2))           #取数据  阻塞,两秒后抛出异常

#以下为执行结果
True
False
2
11
22
# task_done()    join() 
import  queue
q = queue.Queue(5)
q.put(123)
q.put(123)
q.get()
q.task_done() #表示某个任务完成.
q.get()
q.task_done()
q.join()  #如果队列中有没有处理的元素,等待 阻塞,任务执行完成,取消阻塞。

  3、消费者模型
import queue
import threading
import time
q = queue.Queue()

def productor(arg):                                   #put 像队列添加购买请求
    """
    买票
    :param arg:
    :return:
    """
    q.put(str(arg) + ‘- 包子‘)

for i in range(300):
    t = threading.Thread(target=productor,args=(i,))
    t.start()

def consumer(arg):
    """
    服务器后台
    :param arg:
    :return:
    """
    while True:
        print(arg, q.get())                                         #get 处理请求
        time.sleep(2)

for j in range(3):
    t = threading.Thread(target=consumer,args=(j,))
    t.start()
#以下为执行结果:
0 0- 包子
1 1- 包子
2 2- 包子
1 3- 包子
2 4- 包子
0 5- 包子
2 6- 包子
...
  4、线程锁    1)线程锁 ,每次放行一个
import threading
import time
NUM=10
def func(l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    #开锁
    l.release()
lock = threading.Lock()    #锁每次放一个
#lock= threading.RLock()   #多层锁

for i in range(10):
    t=threading.Thread(target=func,args=(lock,))
    t.start()

#以下为执行结果:
9
8
7
6
5
4...

    

    2)信号量锁,可以设置每次放行的个数(5个)
import threading
import time
NUM=10
def func(i,l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM,i)
    #开锁
    l.release()

lock=threading.BoundedSemaphore(5)   #每次放多个

for i in range(30):
    t=threading.Thread(target=func,args=(i,lock,))
    t.start()

#以下为执行结果:
5 1
5 0
3 2
2 4
1 3
0 6
...

    3)事件event锁, 放行所有
import threading
def func(i,e):
    print(i)
    e.wait()  #检查是什么灯,如果红灯,停,绿灯,行
    print(i+100)
event= threading.Event()                  #锁住,要放一起放
for i in range(10):
    t=threading.Thread(target=func,args=(i,event,))
    t.start()
event.clear() #设置成红灯
inp=input(‘>>>‘)
if inp==‘1‘:
    event.set() #设置成绿灯

#以下为运行结果:
0
1
2
3
4
5
6
7
8
9
>>>1
100
101
102
104
105
107
108
109
103
106

    

    4)条件Condition锁        (1).wait()  没一次放行,可以自定义个数
import  threading
def func(i,con):
    print (i)
    con.acquire()
    con.wait()
    print (i+100)
    con.release()
c=threading.Condition()
for i in range(10):
    t= threading.Thread(target=func,args=(i,c))
    t.start()
while True:
    inp=input(‘>>>‘)
    if inp==‘q‘:
        break
    c.acquire()
    c.notify(int(inp))   #锁传几个处理几个
    c.release()

#以下为执行结果
0
1
2
3
4
5
6
7
8
9
>>>1
>>>100
2
>>>101
102

      (2) con.wait_for(condition) 传递参数,条件成立放行
import  threading

def condition():
    ret=False
    r=input(‘>>>‘)
    if  r==‘true‘:
        ret=True
    return  ret
def func(i,con):
    print (i)
    con.acquire()
    con.wait_for(condition)
    print (i+100)
    con.release()
c=threading.Condition()

for i in range(10):
    t= threading.Thread(target=func,args=(i,c,))
    t.start()

#以下为执行结果

>>>1
2
3
4
5
6
7
8
9

>>>true
101
   5、定时器 timer(监控、客户端的时候可能用到)  一秒后执行代码
from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

  6、自定义线程池    简易的线程池
import  queue
import  threading
import time
class ThreadPool:
    def __init__(self,maxsize=5):
        self.maxsize= maxsize
        self._q=queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)
    def get_thread(self):
        return  self._q.get()

    def add_thread(self):
        self._q.put(threading.Thread)

pool =ThreadPool(5)
def task(arg,p):
    print(arg)
    time.sleep(1)
    p.add_thread()

for i in range(100):
    t=pool.get_thread()
    obj= t(target=task,args=(i,pool,))
    obj.start()
   完美的线程池
二、进程

  1.基本用法    
from multiprocessing import Process

def foo(i):
    print(‘say hi‘,i)

if __name__ == "__main__":      #windows下才需要,linux不需要main

    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

#以下为执行结果:
say hi 0
say hi 3
say hi 1
say hi 2
say hi 5
say hi 4
say hi 6
say hi 7
say hi 8
say hi 9

  

  2.进程共享

     (1)默认无法数据共享
from multiprocessing import Process
from multiprocessing import  queues
#import multiprocessing
def foo(i,arg):
     arg.append(i)
     print(‘say hi‘,i,arg)

if __name__ == ‘__main__‘:
    li=[]
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
#        p.daemon= True
        p.start()

#以下为执行结果:
say hi 0 [0]
say hi 1 [1]
say hi 2 [2]
say hi 3 [3]
say hi 6 [6]
say hi 4 [4]
say hi 5 [5]
say hi 9 [9]
say hi 8 [8]
say hi 7 [7]
    (2)queues实现共享   数字递增
from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
def foo(i,arg):
     arg.put(i)
     print(‘say hi‘,i,arg.qsize())

if __name__ == ‘__main__‘:
#    li=[]
    li= queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
#        p.daemon= True
        p.start()

#以下为执行结果
say hi 0
say hi 2
say hi 3
say hi 4
say hi 1
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9
  (3)Array    共享空间设置限制,超过限制会报错
from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
from multiprocessing import  Array

def foo(i, arg):
    arg[i]=i+100
    for item in arg:
        print(item)
    print("========")

if __name__ == ‘__main__‘:
    li=Array(‘i‘,10)
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        #        p.daemon= True
        p.start()

#以下为执行结果:
0
0
102
0
0
0
0
0
0
0
========
0
0
102
103
0
0
0
0
0
0
========
100
0
102
103
0
0
0
0
0
0
========
100
0
102
103
104
0
0
0
0
0
========
100
0
102
103
104
0
0
0
108
0
========
100
0
102
103
104
105
0
0
108
0
========
100
0
102
103
104
105
106
0
108
0
========
100
101
102
103
104
105
106
0
108
0
========
100
101
102
103
104
105
106
107
108
0
========
100
101
102
103
104
105
106
107
108
109
========      
    
  (3) Manger 实现数据共享
from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
from multiprocessing import  Manager

def foo(i, arg):
    arg[i]=i+100
    print(arg.values())

if __name__ == ‘__main__‘:
    obj=Manager()
    li=obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        #        p.daemon= True
        p.start()
 #       p.join()
    import  time
    time.sleep(2)
# p.join()

#以下为执行结果

[100]
[100, 101]
[100, 101, 104]
[100, 101, 102, 104]
[100, 101, 102, 104, 105]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 107]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
    3、进程池  
from multiprocessing import Pool
import  time
def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == ‘__main__‘:

    pool= Pool(5)
    for i in range(30):
#        pool.apply(func=f1,args=(i,))   #串行
        pool.apply_async(func=f1,args=(i,))
#    pool.close() #所有任务执行完毕
    time.sleep(2)
    pool.terminate()  #立即终止
    pool.join()
    print(‘end‘)
#以下为执行结果

0
1
2
3
4
end

  

  3、锁,与线程锁使用方法一样
from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import multiprocessing
import time

def foo(i,lis,lc):
    lc.acquire()
    lis[0] = lis[0] - 1
    time.sleep(1)
    print(‘say hi‘,lis[0])
    lc.release()

if __name__ == "__main__":
    # li = []
    li = Array(‘i‘, 1)
    li[0] = 10
    lock = RLock()
    for i in range(10):
        p = Process(target=foo,args=(i,li,lock))
        p.start()
#以下为执行结果
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0
三、携程
from greenlet import greenlet
import time

def test1():

    print(12)
    gr2.switch()
    time.sleep(1)
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

#以下为执行结果:
12
56
34
78
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print(‘GET: %s‘ % url)
    resp = requests.get(url)
    data = resp.text
    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])

#以下为执行结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
47394 bytes received from https://www.python.org/.
25533 bytes received from https://github.com/.
425622 bytes received from https://www.yahoo.com/.
时间: 2024-10-12 23:27:45

队列、线程、进程、协程的相关文章

11 线程进程协程

1 线程 1.1 基本应用 1.1.1 标准线程(常用) import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() 1.1.2 自定义线程 自定义线程类既threading.Thread流程,自定义run方法 import threading class MyThread(threading.Thread): #自定义类,继承threading.Thread类 d

python全栈开发 * 线程队列 线程池 协程 * 180731

一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.LifoQueue后进先出 后进先出 自带锁 数据安全 from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.pu

线程队列 线程池 协程

1 . 线程队列 from multiprocessing Queue , JoinableQueue  #进程IPC队列 from queue import Queue  #线程队列  先进先出 from queue import LifoQueue  #后进先出的 方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize 队列 Queue : 先进先出 , 自带锁 , 数据安全 栈 LifoQueue : 后进先

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

Python的线程&进程&协程[0] -> 基本概念

基本概念 / Basic Concept 0 简介与动机 / Why Multi-Thread/Multi-Process/Coroutine 在多线程(multithreaded, MT)编程出现之前,计算机程序的执行是由单个步骤序列组成的,该序列在主机的CPU中按照同步顺序执行.即无论任务多少,是否包含子任务,都要按照顺序方式进行. 然而,假定子任务之间相互独立,没有因果关系,若能使这些独立的任务同时运行,则这种并行处理方式可以显著提高整个任务的性能,这便是多线程编程. 而对于Python而

线程 进程 协程

一.什么是线程? 线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元).它被包含在进程之中,是进程中的实际运作单位.一条线程指的是一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务. 一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成.另外,线程是进程的中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源.一个线程可以创建和撤销另一

第九天 线程 进程 协程 队列

详细链接http://www.cnblogs.com/alex3714/articles/5230609.html 1.线程:包含在进程中,是操作系统运算调度的最小单位,是一串指令的集合,直接与cpu交互 2进程:进程是一个程序各种资源的集合.操作系统通过管理这个集合进而运行程序,进程本身并不执行,进程通过调用线程来调度cpu. 3.不同点: 一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程 创建新线程很简单,但是创建一个子进程需要对父进程进行拷贝 线程共享内存,进程的内存是独

Python的线程&进程&协程[0] -> 线程 -> 多线程的建立与使用

常用的多线程功能实现 目录 生成线程的三种方法 单线程与多线程对比 守护线程的设置 1 生成线程的三种方法 三种方式分别为: 创建一个Thread实例,传给它一个函数 创建一个Thread实例,传给它一个可调用的类实例 派生Thread的子类,并创建子类的实例 # There are three ways to create a thread # The first is create a thread instance, and pass a function # The second one

Python的线程&进程&协程[0] -> 线程 -> 多线程锁的使用

锁与信号量 目录 添加线程锁 锁的本质 互斥锁与可重入锁 死锁的产生 锁的上下文管理 信号量与有界信号量 1 添加线程锁 由于多线程对资源的抢占顺序不同,可能会产生冲突,通过添加线程锁来对共有资源进行控制. 1 import atexit 2 from random import randrange 3 from threading import Thread, Lock, current_thread # or currentThread 4 from time import ctime, s

Python的线程&进程&协程[1] -> 线程 -> 多线程的控制方式

多线程的控制方式 目录 唤醒单个线程等待 唤醒多个线程等待 条件函数等待 事件触发标志 函数延迟启动 设置线程障碍 1 唤醒单个线程等待 Condition类相当于一把高级的锁,可以进行一些复杂的线程同步控制.一般Condition内部都有一把内置的锁对象(默认为RLock),对于Condition的使用主要有以下步骤: 建立两个线程对象,及Condition对象; 线程1首先获取Condition的锁权限,acquire(); 线程1执行需要完成的任务后,调用等待wait(),此时,线程1会阻