线程锁、线程池

一、线程(IO密集型工作多线程有用)


  • 线程:
    • 概述:
      • 若一个文件从上到下顺序执行,则为串行执行,整个py文件实际上是一个主线程
      • 若多线程,则可以并行执行,同一个时刻可以运行多个代码段
      • 给每个client请求分配一个线程,则这些线程可以同时工作
    • 多线程、多进程:
      • 1、一个应用程序,可以有多进程和多线程;默认是单进程、单线程
      • 2、单进程、多线程 :
            • 多线程:IO操作(输入输出流,文件操作)有用,因为几乎不用cpu来调度,一般用多线程来提高并发
            • 计算型操作,需要用到cpu调度执行,一般用多进程提高并发
      • 3、GIL 全局解释器锁,即进程中同一时刻只能被CPU 调度一个线程

  • 创建方式
    • 创建方式一:常规方式,比较简单常用
import threading

def func1(arg):

print(arg)

# t = threading.Thread( target=线程要执行的函数,  args=函数参数-数组, kwargs=函数参数-字典)

t = threading.Thread(target=f1, args=(123,) )

t.start()

# 准备就绪,准备让cpu进行调度,

# 只要cpu一旦调度了该线程,就会执行threading 模块中的run方法,run方法只做一件事,执行target指向的函数

# 即,target实际上是run在方法内部执行的

    • 创建方式二: 重写init和run
class MyThread(threading.Thread):

def __init__(self, func, args):

self.func = func

self.args = args

super(MyThread, self).__init__()

def run(self):

self.func(self.args)

def f2():

pass

obj = MyThread(f2, 123)

obj.start()

  • 常用方法:
    • t.start()
        • 准备好该线程,等待CPU进行调度,
        • 虽然线程已经就绪,但是什么时候执行 谁也不知道,只有被cpu进行调度的时候才会执行
    • t.join(3)
        • 逐个执行线程,执行完毕后再往下执行;可设置执行“超时时间”,该方法使得多线程没有意义
        • 主线程代码执行到这里,会等待子线程去执行,等待超时时间为3秒,子线程执行完毕,主线程才会继续往下执行
    • t.setDaemon(True)
        • 设置为后台进程(默认为False),主线程执行过程中,后台线程也在执行,主线程执行完毕,后台线程立即终止

二、线程锁

  • 概述:
    • 线程锁:
      • 因为线程是随机调度,即可能并行去更“改同一个数据”,若多个线程同时对资源进行修改,则会发生错误
      • 互斥锁,同一时刻,只允许一个线程来执行的操作
    • 未使用“锁”:
      • 10个人进行购买商品,每个人购买后商品减1,每个人看到的结果应该为9876543210
      • 实际执行,所有人同时购买商品,同时修改NUM,同时输出NUM ,则都输出0
import threading

import time

NUM = 0

def function1():

global NUM

NUM += 1

time.sleep(0.1) # 需要让所有线程都夯在这里,然后同时输出

print(NUM)

for x in range(10):

t = threading.Thread(target=function1, )

t.start()

  • 各种锁讲解
    • “Rlock 锁”:
      • 将数据修改的位置作为原子操作,加锁
import threading

import time

NUM = 0

lock = threading.RLock()

def function1():

lock.acquire()

global NUM

NUM += 1

time.sleep(2)

print(NUM)

lock.release()

for x in range(10):

t = threading.Thread(target=function1,)

t.start()

    • 注意:lock = threading.Rlock() # 支持多层锁的嵌套,一般用Rlock,lock不支持
def func(l):

global NUM

l.acquire()

NUM -= 1

l.acquire()

time.sleep(2)

l.release()

print(NUM)

l.release()

    • semaphore”锁
      • 信号量:表示同一时刻允许有多个线程同时工作,比如火车站过安检,每次允许10个人,上批次安检完成后再来十个人
      • 注意:若同一批次运行的线程,都修改同一个数据,依然会有“数据错误”的情况,这里只是演示功能
import threading

import time

NUM = 0

semaphore = threading.BoundedSemaphore(5) # 表示每批放进来5个线程

def function1(i, se):

se.acquire()

global NUM

NUM += 1

time.sleep(2)

print(‘我是线程:{}, NUM此时为:{}‘.format(i, NUM))

se.release()

for x in range(10):

t = threading.Thread(target=function1, args=(x, semaphore))

t.start()

    • “event”锁:
      • 事件:要锁全部锁,要放全部放,实则内部维护了一个变量,值为布尔值
      • 该锁提供了3个方法:
        1. set() # 将Flag 值设为True
        2. clear() # 将Flag 值设为 False
        3. wait() # 若Flag为False,则阻塞住;若为True则放行
import threading

event = threading.Event()

def function(id, en):

print(‘兵{}:大王,怎么办?‘.format(id))

en.wait()

print(‘兵{}:杀呀~~~‘.format(id))

for x in range(10):

t = threading.Thread(target=function, args=(x, event))

t.start()

event.clear()

print(‘1、杀出一条血路 2、投降‘)

you_choice = input(‘>>‘).strip()

if you_choice == ‘1‘:

event.set()

    • "condition" 锁
      • 使得线程等待,只有当满足条件时,才放出N个线程去执行任务
      • 注意,wait和notify方法前后,必须被acquire和release方法包起来
      • 姿势1:自行notify 通知运行N个
import threading

condition = threading.Condition()

def function(id, con):

con.acquire()

con.wait()  # 子线程全部卡在这里,等待通知

print(‘兵{}:杀呀~~~‘.format(id))

con.release()

for x in range(100):

t = threading.Thread(target=function, args=(x, condition))

t.start()

print(‘你要挑战几个?‘)

you_choice = input(‘>>‘).strip()

condition.acquire()

condition.notify(int(you_choice))   # 传进去几个,上面运行几个线程

condition.release()

      • 姿势2:条件成立,则放出一个去运行
import threading

def condition_func(): # 该函数返回值必须为布尔值

ret = False

inp = input(‘>>‘).strip()

if inp == ‘1‘:

ret = True

return ret

def function(id, con):

con.acquire()

con.wait_for(condition_func) # 一旦该函数体返回值为True,则放出一个线程

print(‘thread{}:im gone~~~‘.format(id))

con.release()

condition = threading.Condition()

for x in range(5):

t = threading.Thread(target=function, args=(x, condition))

t.start()

三、Timeer 定时器:

  • 定时器,指定n秒后执行某操作

from threading import Timer

import time

def function():

print(‘开火~‘)

t = Timer(3, function)  # 子线程,三秒后执行该函数

t.start()

for i in range(1, 4):

print(‘time:{}‘.format(i)) # 主线程,数数,每次间隔1秒

time.sleep(1)

四、线程池:

  • 线程池概述:
    • “线程池”与“上下文切换”:
      • CPU调度线程,当时间片用完,会进行切换,每次切换时线程现场的“保存与载入”都是时间开销
      • 当线程数目达到一个峰值后,再多创建线程,执行效率会下降
      • 因此,应该控制线程创建的最大数目,池中线程取一个少一个,无线程时,后续请求等待;执行完毕归还线程
    • 要创建一个线程池,那么该线程池应满足如下条件:
      • 线程池为一个容器
      • 池中线程用一个少一个
      • 池中无线程,则进行等待
      • 线程执行完任务,则进行归还
  • 创建姿势1: 利用队列来存放“线程”
import time

import queue

import threading

class MyPool:

def __init__(self, maxsize=5):

self.maxsize = maxsize

self._q = queue.Queue(self.maxsize)  # 创建一个指定大小的队列

for _ in range(self.maxsize):

self._q.put(threading.Thread)  # 在队列中全为线程"类"

def get_thread(self):

return self._q.get()  # 取出一个线程"类"

def add_thread(self):

self._q.put(threading.Thread)  # 添加一个线程“类”

# 定义任务,参数为线程池(执行该任务的线程来源),执行完毕后向该池中归还(添加)一个线程

def task(n, p):

print(‘{}: 执行任务,完毕‘.format(n))

time.sleep(2)

p.add_thread()

# 创建一个大小为5的线程池

pool = MyPool(5)

for i in range(100):

t_class = pool.get_thread()  # 从队列中获取一个线程"类名称"

t_obj = t_class(target=task, args=(i, pool))  # 用取出来的类,实例化个线程,并交给线程任务去执行,每次从队列中取一个线程,若没有线程则等待

t_obj.start()

  • 此时,每次有5个线程去处理工作,不会超过5
  • 问题:
    • 线程重用:任务执行完成后归还线程,用的是“重新创建一个线程并put进队列”的方法,即原来的线程还放于内存中等待被GC回收
    • 空闲线程:若任务数少于池数目,则会多余创建,即应该线程池最初是空的,来一个创建一个,最大为5

姿势2:

  • 队列中存放任务:
    • 往往每个任务都是一个函数,
    • 可以将任务和其相关参数搞成元组(函数名,参数),将这些元组put到队列中,则队列中保存的全是需要执行的任务
  • 创建N个线程(N为线程池大小):
    • 每个线程都“循环”从队列中get任务并执行(线程重用)
    • 若队列中所有任务都取完了,则终止已经创建的线程,终止方法如下:
      1. get方法有超时时间,可以设置超时时间,超过这个时间则线程自动销毁
      2. 在队列末尾插入几个空值,get后判断,若取到的是任务则执行,否则则终止

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import queue

import threading

import contextlib

import time

StopEvent = object() # 创建个静态字段,字段任何值都行,相当于None,即要在队列后插入的空值

class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):

# 创建任务队列,可显示指定任务队列大小,否则为不限制

if max_task_num:

self.q = queue.Queue(max_task_num)

else:

self.q = queue.Queue()

self.max_num = max_num # 最大线程数

self.cancel = False

self.terminal = False

self.generate_list = [] # 当前已经创建的线程

self.free_list = [] # 当前的空闲线程,队列中没有任务,则线程会空闲

def run(self, func, args, callback=None):

"""

线程池执行一个任务

:param func: 任务函数

:param args: 任务函数所需参数

:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)

:return: 如果线程池已经终止,则返回True否则None

"""

if self.cancel:

return

# 没有空闲着的线程,并且已经创建的线程有没有超过线程池最大大小时,创建线程

if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:

self.generate_thread()

w = (func, args, callback,) # 将线程的id,要执行的任务,包装成一个元组

self.q.put(w) # 将包装好的元组放入队列中

def generate_thread(self):

"""

创建一个线程,并执行call方法

"""

t = threading.Thread(target=self.call)

t.start()

def call(self):

"""

循环去获取任务函数并执行任务函数

"""

current_thread = threading.currentThread() # 获取当前线程

self.generate_list.append(current_thread) # 在线程列表中添加一个线程,

event = self.q.get() #从队列中获取任务,任务都是包装的元组(id,任务)

# 若是一个元组,即不等于stopevent

while event != StopEvent:

func, arguments, callback = event # 元组解包

try:

result = func(*arguments)  # 执行任务

success = True

except Exception as e:

success = False

result = None

if callback is not None:

try:

callback(success, result)

except Exception as e:

pass

# 线程执行完任务,则线程处于空闲状态,则标记该任务为空闲进程,下次直接会从空闲进程去取任务

with self.worker_state(self.free_list, current_thread):

if self.terminal:

event = StopEvent

else:

event = self.q.get() # 执行完任务后,该线程又去获取任务,然后再次while

# 若evnent不是一个元组,则从线程列表中移除当前线程

else:

self.generate_list.remove(current_thread)

def close(self):

"""

执行完所有的任务后,所有线程停止,创建了几个线程,则往任务队列中插入几个空值

"""

self.cancel = True

full_size = len(self.generate_list)

while full_size:

self.q.put(StopEvent)

full_size -= 1

def terminate(self):

"""

无论是否还有任务,终止线程

"""

self.terminal = True

while self.generate_list:

self.q.put(StopEvent)

self.q.queue.clear()

@contextlib.contextmanager

def worker_state(self, state_list, worker_thread):

"""

用于记录线程中正在等待的线程数

"""

state_list.append(worker_thread)

try:

yield

finally:

state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):

# 需要执行的任务

pass

def action(i):

print(i)

# 创建300个任务

for i in range(300):

ret = pool.run(action, (i,), callback)

time.sleep(5)

print(len(pool.generate_list), len(pool.free_list))

print(len(pool.generate_list), len(pool.free_list))

# pool.close()

# pool.terminate()

时间: 2024-10-27 18:17:04

线程锁、线程池的相关文章

iOS 多线程 NSThread NSOperation NSOperationQueue GCD 线程锁 线程阻塞

iPhone中的线程应用并不是无节制的,官方给出的资料显示,iPhone OS下的主线程的堆栈大小是1M,第二个线程开始就是512KB,并且该值不能通过编译器开关或线程API函数来更改,只有主线程有直接修改UI的能力,所以一些数据层面可以开辟线程来操作进行,iOS线程的操作方法有NSThread NSOperation NSOperationQueue GCD: NSThread方法有 //NSThread自动 - (IBAction)didClickNSThreadAutoButtonActi

线程锁 线程并发

<> using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static int max=10000000; static long _count = 0; static vo

Java——线程锁,死锁,等待唤醒机制

一.线程锁 线程安全问题 p { margin-bottom: 0.25cm; direction: ltr; color: #000000; line-height: 120%; text-align: justify; widows: 0; orphans: 0 } p.western { font-family: "Calibri", sans-serif; font-size: 10pt } p.cjk { font-family: "宋体"; font-s

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

线程--守护线程、线程锁、信号量、事件、条件、定时器、队列、池(三)

守护线程 import timefrom threading import Threaddef func1(): while True: print('*'*10) time.sleep(1)def func2(): print('in func2') time.sleep(5) t = Thread(target=func1,)t.daemon = Truet.start()t2 = Thread(target=func2,)t2.start()t2.join() #加join后会等待func

InnoDB 存储引擎的线程与内存池

InnoDB 存储引擎的线程与内存池 InnoDB体系结构如下: 后台线程: 1.后台线程的主要作用是负责刷新内存池中的数据,保证缓冲池中的内存缓存的是最近的数据: 2.另外,将以修改的数据文件刷新到磁盘文件: 3.同时,保证在数据库发生异常的情况下,InnoDB能恢复到正常运行状态. 内存池:InnoDB有多个内存块,这些内存块组成了一个大的内存池.这些内存块包括有:缓冲池(innodb_buffer_pool)和日志缓冲(log_buffer)以及额外内存池(innodb_addtional

线程和线程池

首先线程有守护线程和用户线程两种,区别就是用户线程是否保持程序的运行状态.当程序在运行时,必定有一个或以上的线程是用户线程,而当程序结束时,所有守护线程也都将被关闭.使用Thread.setDaemon(ture)可以把线程标记为守护线程,默认线程状态继承自创建它的线程.线程的两种创建方法不多说了. 线程安全一般指的是共享变量被多个线程访问读写造成的数据不一致或者是数据不完整性.一般有如下几种方法可供参考: 1.synchronized方法,提供只能供一个线程访问的类,方法或语句块,控制变量的修

线程系列08,实现线程锁的各种方式,使用lock,Montor,Mutex,Semaphore以及线程死锁

当涉及到多线程共享数据,需要数据同步的时候,就可以考虑使用线程锁了.本篇体验线程锁的各种用法以及线程死锁.主要包括: ※ 使用lock处理数据同步※ 使用Monitor.Enter和Monitor.Exit处理数据同步※ 使用Mutex处理进程间数据同步※ 使用Semaphore处理数据同步※ 线程死锁 □ 使用lock处理数据同步 假设有一个类,主要用来计算该类2个字段的商,在计算商的方法之内让被除数自减,即被除数有可能为零.使用lock语句块保证每次只有一个线程进入该方法. class Th