python 之路11 进程池,线程池

1.线程
基本使用
创建线程
import threading

# def f1(arg):
# print(arg)
#
# t = threading.Thread(target=f1,args=(123,))
# t.start()

# class Mythread(threading.Thread):
# def __init__(self,func,arg):
# self.func = func
# self.arg = arg
# super(Mythread,self).__init__()
#
# def run(self):
# self.func(self.arg)
#
# def f2(arg):
# print(arg)
#
# obj = Mythread(f2,345,)
# obj.start()


event #把所有线程锁住
import threading

def func(i,e):
print(i) #输出当前循环次数
e.wait() #wait 检测当前为红灯还是绿灯,默认为红灯
print(i+100) #如果wait变为绿灯吧i+100 然后输出

event = threading.Event() #进程锁,可以比喻成红绿灯,红灯时不让所有人过,如果变为绿灯,放行所有人
for i in range(10): #进程循环10次
t = threading.Thread(target=func,args=(i,event)) #把次数和进程所传送到函数
t.start()

event.clear() #设置为红灯
inp = input(‘>>>‘)
if inp == ‘1‘:
event.set() #如果输入1,把红灯变为绿灯

Lock,RLock #为只能锁一个线程
import threading
import time

NUM = 10

def func(l):
global NUM
l.acquire()
NUM-=1
time.sleep(2)

print(NUM)
l.release()

lock = threading.RLock() #为了防止多个进程多一个参数进行修改

for i in range(10):
t = threading.Thread(target=func,args=(lock,))
t.start()

BoundedSemaphore #为多个线程加锁
import threading
import time

NUM = 10

def func(l):
global NUM
l.acquire()
NUM-=1
time.sleep(2)

print(NUM)
l.release()

lock = threading.BoundedSemaphore(5) #为多个线程加锁,参数为为多少个线程加锁,也就是一次放行多少个

for i in range(30):
t = threading.Thread(target=func,args=(lock,))
t.start()

condition 当满足条件时执行
import threading

def func(i,con):
print(i)
con.acquire()
con.wait() #默认条件不成立

print(i+100)
con.release()

con = threading.Condition() #当满足某个条件时执行

for i in range(10):
t = threading.Thread(target=func,args=(i,con))
t.start()

while True:
inp = input(‘>>>‘)
if inp == ‘q‘:
break

con.acquire() #notify 配合acquire 和release使用
con.notify(int(inp))
con.release()

timer
from threading import Timer

def hello():
print(‘hello guys‘)

t = Timer(2,hello) #过多少秒执行某个函数
t.start()

def contion(): #条件函数
ret = False
inp = input(‘>>>‘)
if inp == ‘true‘:
ret = True
else:
ret = False
return ret

def func(i,con):
print(i)
con.acquire()
con.wait_for(contion) #当满足某个条件时执行

print(i+100)
con.release()

con = threading.Condition()

for i in range(10):
t = threading.Thread(target=func,args=(i,con))
t.start()

生产者消费者模型(队列)
# 队列特点:先进先出

# put 放数据,block是否阻塞,timeout阻塞时的超时时间
# get 取数据,默认阻塞,阻塞时的超时时间
#qsize()真实个数
#maxsize 最大支持个数
#full()查看是否为满
#etmpt()查看是否为空
#join,task_done ,阻塞进程,当队列任务执行完毕后,不再阻塞

#先进先出队列
# q = queue.Queue(2)
#
# q.put(11)
# q.task_done()
# q.put(22)
# q.task_done()
#
# print(q.get())
# print(q.get(block=False))
#
# q.join()

#后进先出队列
# q = queue.LifoQueue()
# q.put(123)
# q.put(456)
# print(q.get())

#优先级队列
# q = queue.PriorityQueue()
#
# q.put((1,1))
# q.put((3,3))
# q.put((2,2))
# print(q.get())
# print(q.get())

#双向队列,取值过程中从最右开始取值
# q = queue.deque()
#
# q.append(213)
# q.appendleft(999)
# q.append(456)
#
#
# print(q.pop())
# print(q.pop())

import queue
import threading
import time

q = queue.Queue()

#模拟买票过程
def mai(arg):
q.put(str(‘买‘) + str(arg)) #买票函数

for i in range(6): #循环6次 put 6次到消息队列 消息队列列表有6个请求
t = threading.Thread(target=mai,args=(i,)) #买票进程
t.start()

def shengchan(arg): #处理进程函数
while True:
print(arg,q.get())
time.sleep(2) #每处理完一次暂停两秒

for j in range(3): #循环处理3次,处理3个请求,每处理三个请求暂停两秒
t = threading.Thread(target=shengchan,args=(j,))
t.start()

自定义线程池

2.进程
基本使用
默认不共享

进程池
需要条件:
有一个容器(可以设置最大个数)
取一个少一个
无线程时等待
线程执行完毕,交还线程

#简单的线程池
import threading
import queue
import time

class MyThread: #进程池名字
def __init__(self,maxsize): #进程池允许最大的链接数
self.maxsize = maxsize
self.q = queue.Queue(maxsize) #q 为队列,当不能连接/没有线程的时候 阻塞住,等待线程出现
for i in range(maxsize): #循环最大连接数
self.q.put(threading.Thread) #把线程put到队列

def get_thread(self):
return self.q.get() #get队列,get到的为threading.Thread类名

def add_thread(self):
self.q.put(threading.Thread) #如果当前线程执行完毕,重新put 线程到队列

poll = MyThread(5) #实例化MyThread类

def task(arg,pool): #想要线程执行的任务
print(arg) #输出参数
time.sleep(1)
poll.add_thread() #当线程执行完毕重新put线程到队列

for i in range(30): #循环30次
t = poll.get_thread() #t为threading.Thread
obj = t(target = task,args=(i,poll,)) #让线程执行任务,并传递参数
obj.start() #开始执行

进程共享

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
arg.put(i)
print(‘hi‘,i,arg.qsize())

if __name__ == ‘__main__‘:
li = queues.Queue(20,ctx=multiprocessing) #创建进程对象(最大进程数量,锁)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()

from multiprocessing import Process
from multiprocessing import Array

def foo(i,arg):
arg[i] = i+100
for item in arg:
print(item)
print(‘---‘)
if __name__ == ‘__main__‘:
li = Array(‘i‘,10) #创建进程对象(类型,数量) Aarray为数组
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()

from multiprocessing import Process
from multiprocessing import Manager

def foo(i,arg):
arg[i] = i+100
print(arg.values())

if __name__ == ‘__main__‘:
obj = Manager()

li = obj.dict() #创建进程对象dic类型
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
p.join() #当有进程没有执行完时阻塞住

进程池
from multiprocessing import Pool
import time
def task(i):
time.sleep(1)
print(i+100)

if __name__ == ‘__main__‘:
poll = Pool(5)
for i in range(30):
poll.apply_async(func=task,args=(i,)) #并发去执行任务
poll.close() #当所有任务执行完关闭
poll.join() #当任务没执行完,阻塞

PS: IO密集型-多线程
计算密集型 - 多进程

3.协程
原理:利用一个线程,分解一个为多个微线程 >>程序级别
greenlet
gevent 封装greenlet

from gevent import monkey;monkey.patch_all()
import gevent
import requests

def f(url):
print(‘GET :%s‘ % url)
resp = requests.get(url)
data = resp.text #内容
print(‘%s bytes from %s‘ % (len(data), url))

gevent.joinall([
gevent.spawn(f,‘https://www.baidu.com‘),
gevent.spawn(f,‘https://github.com‘),
])

4.缓存
memcache
k - > "" #memcache只支持key-value(字符串)
1.天生集群
2.基本操作
3.gets,cas 成对出现,当gets只允许cas一次
redis
k - > ""
k - > [] #列表
k - > {}
k - > [] #集合
k - > [(11,1),(22,2)] #可以排序

时间: 2024-10-21 13:59:59

python 之路11 进程池,线程池的相关文章

Python之路,Day9, 进程、线程、协程篇

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Python之路:进程、线程

本节内容 一.进程与线程区别 1.1 什么是线程 1.2 什么是进程 1.3 进程与线程的区别 二.Python GIL全局解释器锁 三.线程 3.1 threading模块 3.2 Join & Daemon 3.3 线程锁(互斥锁Mutex) 3.4 RLock(递归锁) 3.5 Semaphore(信号量) 3.6 Events 3.7 Queue队列 3.8 生产者消费者模型 四.进程 4.1 多进程multiprocessing 4.2 进程间通讯 4.3 Queue队列 4.4 Pi

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

python中socket、进程、线程、协程、池的创建方式

一.TCP-socket 服务端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind(('127.0.0.1',8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send('你好'.encode('utf-8')) print(conn.recv(1024).deco

第11章 Windows线程池(3)_私有的线程池

11.3 私有的线程池 11.3.1 创建和销毁私有的线程池 (1)进程默认线程池 当调用CreateThreadpoolwork.CreateThreadpoolTimer.CreateThreadpoolWait或CreateThreadpoolIo,并使传入参数PTP_CALLBACK_ENVIRON设为NULL时,那么所有的工作项将被添加到进程默认的线程池.一般这个默认的线程池能满足大多数应用程序的要求.其生命期与进程相同,在进程终止的时候,Windows负责线程池的清理和销毁工作. (

基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池线程使用c++11库和线程池之间的消息通讯使用一个简单的无锁消息队列适用于linux平台,gcc 4.6以上 标签: <无> 代码片段(6)[全屏查看所有代码] 1. [代码]lckfree.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // lckfree.h //

Python3 从零单排28_线程队列&amp;进程池&amp;线程池

1.线程队列 线程队列有三种:先进先出,后进先出,按优先级进出,具体如下: 1 import queue 2 3 # 先进先出 4 q = queue.Queue(3) 5 6 q.put(1) 7 q.put(2) 8 q.put(3) 9 # q.put(4) # 再放阻塞,等待队列消费 10 # q.put(4,block = False) # 不阻塞,强制放数据,如果满的情况下直接报错 等价与 q.put_nowait(4) 11 # q.put(4,block = True) # 阻塞

进程池线程池 协程

socket服务端实现并发 服务端需要满足以下3点: 1 固定的ip和port 2 24小时提供服务 3 能够实现并发 多线程实现并发: 服务端: import socket from threading import Thread import os server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) #半连接池 def communicate(conn): while True: try: dat

第11章 Windows线程池(1)_传统的Windows线程池

第11章 Windows线程池 11.1 传统的Windows线程池及API 11.1.1 传统的线程池对象及对应的API 线程池对象 API 普通任务线程池 QueueUserWorkItem 计时器线程池 CreateTimerQueue(创建线程池) CreateTimerQueueTimer(创建计时器) ChangeTimerQueueTimer DeleteTimerQueueTimer DeteTimerQueueEx 同步对象等待线程池 RegisterWaitForSingle