python之路 -- 并发编程之线程

进程 是 最小的内存分配单位

线程 是 操作系统调度的最小单位

线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程

开启一个线程所需要的时间要远远小于开启一个进程

GIL锁(即全局解释器锁) 锁的是线程

在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行

1.创建线程的两中方式:

import time
from threading import Thread
def func(args):
    time.sleep(1)
    print(args)

t = Thread(target=func,args=(10,))
# 传入的参数也必须以元组的形式传
t.start()

# 创建线程的另一种方式(使用面向对象创建线程)
import time
from threading import Thread
class MyTread(Thread):
    def __init__(self,arg):
        super().__init__()    # 调用父类的__init__
        self.arg = arg
    def run(self):        # 方法名必须是run
        time.sleep(1)
        print(self.arg)

t = MyTread(10)
t.start()

2.线程与进程效率的比较

import time
from threading import Thread
from multiprocessing import Process
def func(n):
    n + 1

if __name__ == ‘__main__‘:
    start1 = time.time()
    t_lst1 = []
    for i in range(100):
        t = Thread(target=func,args=(i,))
        t.start()
        t_lst1.append(t)
    for t in t_lst1:t.join()
    t1 = time.time() - start1

    start2 = time.time()
    t_lst2 = []
    for i in range(100):
        t = Process(target=func, args=(i,))
        t.start()
        t_lst2.append(t)
    for t in t_lst2: t.join()
    t2 = time.time() - start2
    print(t1,t2)

输出的结果为:
0.02698826789855957 10.160863876342773

# 可见开启一个线程所需要的时间要远远小于开启一个进程

多线程用于IO密集型,如socket,爬虫,处理web请求,读写数据库;

多进程用于计算密集型,如金融分析。

3.多进程与多线程数据共享的比较:

多个线程内部有自己的数据栈,线程内部数据不共享

全局变量在多个线程之间是共享的(线程之间资源共享)

多进程修改全局变量
import os,time
from multiprocessing import Process
from threading import Thread

def func():
    global g
    g -= 10
    print(g,os.getpid())

g = 100   # 全局变量g
t_lst = []
if __name__ == "__main__":
    for i in range(5):
        t = Process(target=func)
        t.start()
        t_lst.append(t)
    for t in t_lst: t.join()
    print(g)
"""输出结果为:
90 143648
90 160300
90 143316
90 160804
90 159348
100
"""

多线程修改全局变量
import os,time
from multiprocessing import Process
from threading import Thread

for i in range(5):
    t = Thread(target=func)
    t.start()
    t_lst.append(t)
for t in t_lst : t.join()
print(g)
"""输出的结果为;
90 161716
80 161716
70 161716
60 161716
50 161716
50
"""

4.线程模块中的其他方法:

1、Thread实例对象的方法(t为实例化的一个线程对象)
  t.isAlive():返回线程是否存活。
  t.getName():返回线程名。
  t.setName():设置线程名。

2、threading模块提供的一些方法:
  threading.active_count():查看程序中存在多少个活着的线程,与len(threading.enumerate())有相同的结果。
  threading.enumerate(): 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程)。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  threading.current_thread():查看当前线程的线程名和线程号

3.实例:

例子:
import time
import threading

def func(n):
    time.sleep(0.5)
    print(n,t.getName(),threading.get_ident(),t.isAlive())
# 打印线程名,线程号,判断线程是否在活动

# 当一个线程执行完成func之后就结束了,t.isAlive()查看为False

for i in  range(5):
    t = threading.Thread(target=func,args=(i,))
    t.start()
    t.join()

print(threading.active_count())    # 查看程序中存在多少个活着的线程
print(threading.current_thread())   # 查看当前线程的线程名和线程号
print(threading.enumerate())        # 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程)

"""执行结果为:
0 Thread-1 33164 True
1 Thread-2 57836 True
2 Thread-3 165280 True
3 Thread-4 171196 True
4 Thread-5 119008 True
1
<_MainThread(MainThread, started 165276)>
[<_MainThread(MainThread, started 165276)>]
"

5.守护线程

守护进程与守护线程的区别:

  1.守护进程随着主进程代码的执行结束而结束
  2.守护线程会在主线程结束之后再等待其他子线程结束之后才结束

from threading import Thread
import time

def func():
    print(‘我是守护线程‘)
    time.sleep(0.5)
    print(‘守护线程结束了‘)

def func2():
    print(‘我是线程2,非守护线程‘)
    time.sleep(2)
    print(‘func2执行完了‘)

t = Thread(target=func)
t.daemon = True        # 仍是加在start()前
t.start()
t2 = Thread(target=func2)
t2.start()

print(‘我是主线程‘)
t2.join()
print(‘主线程结束了。‘)

"""执行结果为:
我是守护线程
我是线程2,非守护线程
我是主线程
守护线程结束了
func2执行完了
主线程结束了。
"""

# 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 再结束。然后回收子进程的资源

实例

6.线程锁

1.不加锁

from threading import Thread
import time
def func():
    global n
    time.sleep(0.5)
    n-=1

n = 10
t_list = []
for i in range(10):
    t = Thread(target=func)
    t.start()
    t_list.append(t)

[t.join() for i in t_list]
print(n)
# 输出结果n为9
# 创建线程非常快,创建的10个线程都拿到了global的n,此时n为10,然后各线程都做-1操作,得到的结都为9

不加锁

2.加锁

from threading import Thread
from threading import Lock

def func(lock):
    global n
    lock.acquire()
    n-=1
    lock.release()

lock = Lock()
n = 10
t_list = []
for i in range(10):
    t = Thread(target=func,args=(lock,))
    t.start()
    t_list.append(t)

[t.join() for i in t_list]
print(n)
# 输出的结果为0
通过加锁后保证了多线程数据安全,但损失了效率

加锁

互斥锁Lock:在同一个线程或者进程之间,当有两个acquire的时候,就会产生阻塞(死锁)

递归锁RLock:在同一个线程或则进程之间,无论acquire多少次都不会产生阻塞(死锁)

3.多人吃面事例

多人吃面事例:
# 有多个人在一个桌子上吃面,只有一份面和一个叉子,当一个人同时拿到了这两样才能完成一次吃面的动作。
from threading import Thread,Lock
import time

def eat1(name):
    noodle_lock.acquire()
    print(‘%s拿到面条了‘%name)
    fork_lock.acquire()
    print(‘%s拿到叉子了‘%name)
    print(‘%s吃面‘%name)
    noodle_lock.release()
    fork_lock.release()

def eat2(name):
    fork_lock.acquire()
    print(‘%s拿到叉子了‘%name)
    time.sleep(1)
    noodle_lock.acquire()
    print(‘%s拿到面条啦‘%name)
    print(‘%s吃面‘%name)
    noodle_lock.release()
    fork_lock.release()

# 实例化2个锁,给面条和叉子加上锁
noodle_lock = Lock()
fork_lock = Lock()

Thread(target=eat1,args=(‘peo1‘,)).start()
Thread(target=eat2,args=(‘peo2‘,)).start()
Thread(target=eat1,args=(‘peo3‘,)).start()
Thread(target=eat2,args=(‘peo4‘,)).start()
"""执行结果为:
peo1拿到面条了
peo1拿到叉子了
peo1吃面
peo2拿到叉子了
peo3拿到面条了
“陷入阻塞”…………
"""

多人吃面事例(使用互斥锁):

# 加递归锁
from threading import Thread,RLock
import time

def eat1(name):
    noodle_lock.acquire()       # 一把钥匙
    print(‘%s拿到面条啦‘%name)
    fork_lock.acquire()
    print(‘%s拿到叉子了‘%name)
    print(‘%s吃面‘%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print(‘%s拿到叉子了‘%name)
    time.sleep(1)
    noodle_lock.acquire()
    print(‘%s拿到面条啦‘%name)
    print(‘%s吃面‘%name)
    noodle_lock.release()
    fork_lock.release()

fork_lock = noodle_lock  = RLock()

Thread(target=eat1,args=(‘peo1‘,)).start()
Thread(target=eat2,args=(‘peo2‘,)).start()
Thread(target=eat1,args=(‘peo3‘,)).start()
Thread(target=eat2,args=(‘peo4‘,)).start()

# 通过加递归锁实现了数据安全
# 递归锁一般用于有多个数据需要加锁的时候,不会出现数据安全问题

多人吃面事例(使用递归锁)

对于有多个数据需要加锁的时候,用互斥锁仍然会出现数据不安全问题

当多线程是对一个数据处理的时候通过给这个数据加上互斥锁实现了数据安全。

但是当有多个数据被多线程调用处理的时候,加互斥锁仍然会出现数据安全问题。这时候需要用到递归锁,给多个数据加锁。

7.线程信号量

# 线程信号量
# 相当于加一个锁,但此锁可以根据自己需要设置有多个钥匙,此时可以允许有多个线程同时去做操作
from threading import Thread,Semaphore
import time

def func(sem,n):
    sem.acquire()
    time.sleep(0.5)
    print(n,end=‘ ‘)
    sem.release()

sem = Semaphore(4)
for i in range(10):
    t = Thread(target=func,args=(sem,i))
    t.start()

"""输出的结果为:
2 3 1 0 5 4 7 6 8 9     # (4个一组基本同时输出)
"""

8.线程事件

# 事件被创建的时候
# False状态
    # wait() 阻塞
# True状态
    # wait() 非阻塞
# clear 设置状态为False
# set  设置状态为True
#  起两个线程
#  第一个线程 : 连接数据库
        # 等待一个信号 告诉我我们之间的网络是通的
        # 连接数据库
#  第二个线程 : 检测与数据库之间的网络是否连通
        # time.sleep(0,2) 2
        # 将事件的状态设置为True
from threading import Thread,Event
import time,random

def connect_db(e):
    count = 0
    while count < 3:
        e.wait(0.5) # 状态为False的时候,只等待0.5秒就结束
        if e.is_set() == True:
            print(‘连接数据库‘)
            break
        else:
            count += 1
            print(‘第%s次连接失败‘ %count)
    else:
        raise TimeoutError(‘数据库连接超时‘)

def check_web(e):
    time.sleep(random.randint(0, 3))
    e.set()

e = Event()
t1 = Thread(target=connect_db, args=(e,))
t2 = Thread(target=check_web, args=(e,))
t1.start()
t2.start()

9.线程条件

# notify(int数据类型)  造钥匙

from threading import Thread
from threading import Condition
def func(con,i):
    con.acquire()
    con.wait() # 等钥匙
    print(‘在第%s个循环里‘%i)
    con.release()
con = Condition()
for i in range(10):
    Thread(target=func,args = (con,i)).start()
while True:
    num = int(input(‘>>>‘))
    con.acquire()
    con.notify(num)  # 造钥匙
    con.release()

输出结果:

10.线程定时器

定时器,即指定n秒后执行某操作

import time
from threading import Timer
def func():
    print(‘时间同步‘)   #1-3

while True:
    time.sleep(1)
    t = Timer(5,func).start()   # 非阻塞的
    # 定时等待5秒之后就调用func
    # time.sleep(5)

# 自动更新验证码
from threading import Thread,Timer
import random

class Check_number:
    def __init__(self):
        self.cash_code() # 程序一开始便实例化一个验证码
    def make_code(self,n=4):
        res = ‘‘
        for i in range(n):
            s1 = str(random.randint(0,9)) # 0到9间的任意自然数
            s2 = chr(random.randint(65,90)) # 24个小写字母
            res += random.choice([s1,s2]) # 字符和数字的任意组合
        return res
    def cash_code(self,interval=3):
        self.code = self.make_code()  # 实例化一个验证码
        print(self.code) # 打印验证码
        self.t = Timer(interval,self.make_code) # 定时器,等待指定时间再运行
        self.t.start()

    def check(self):
        while True:
            mes = input(‘输入验证码>>>:‘).strip()
            if self.code == mes.upper():
                print(‘输入正确!‘)
                self.t.cancel() # 关闭定时器
                break

obj = Check_number()
obj.check()

自动更新验证码

11.线程队列

import queue
q1 = queue.Queue()  # 队列 先进先出
q2 = queue.LifoQueue()  # 栈 先进后出
q3 = queue.PriorityQueue()  # 优先级队列

q.put()     # 往队列中放入值,当队列满的时候陷入等待状态,直到队列可以放入值的时候放值
q.get()     # 从队列取值,当队列为空的时候陷入等待状态,直到队列中有值的时候取值
q.put_nowait()  # 往队列中放入值,当队列满的时候,直接报错
q.get_nowait()  # 从队列取值,当队列为空的时候,直接报错
优先级队列事例:
优先级队列事例:
q = queue.PriorityQueue()  # 优先级队列
q.put((20,‘a‘))
q.put((10,‘b‘))
q.put((-5,‘d‘))
q.put((1,‘e‘))
q.put((1,‘f‘))
print(q.get())
print(q.get())
print(q.get())
print(q.get())

"""执行结果为:
(-5, ‘d‘)
(1, ‘e‘)
(1, ‘f‘)
(10, ‘b‘)
"""

# 此队列按照值的优先级的顺序来取值,优先级数越小越先取值,
# 当优先级一样的时候,就比较值的ASCII值的大小,小的先取

12.线程池

import time
from concurrent.futures import ThreadPoolExecutor
def func(n):
    time.sleep(1)
    print(n,end=" ")
    return 2*n

def call_back(m):   # m接收的为func中return的值
    print(‘结果是 %s‘%m.result())  # 从对象中获取值的方法.result(),进程池中是.get()

tpool = ThreadPoolExecutor(max_workers=5)   #  默认 不要超过cpu个数*5

for i in  range(10):
    tpool.submit(func,i).add_done_callback(call_back)

"""执行结果为:
4 结果是 8
2 结果是 4
3 结果是 6
1 结果是 2
0 结果是 0
9 结果是 18
8 结果是 16
6 结果是 12
5 结果是 10
7 结果是 14
"""

# tpool.map(func,range(10))  # 拿不到返回值
t_lst = []
for i in  range(10):
    t = tpool.submit(func,i)
    t_lst.append(t)
tpool.shutdown()  # shutdown相当于close+join
print(‘主线程‘)
for t in t_lst:print(‘*‘,t.result(),end=‘ ‘)

"""执行结果为:
4 2 3 1 0 9 8 7 6 5 主线程
* 0 * 2 * 4 * 6 * 8 * 10 * 12 * 14 * 16 * 18
"""

13.多线程实现socket聊天

  server 端

import socket
from threading import Thread

def chat(sk):
    conn, addr = sk.accept()
    conn.send("hello,我是服务端".encode(‘utf-8‘))
    msg = conn.recv(1024).decode(‘utf-8‘)
    print(msg)
    conn.close()

sk = socket.socket()
sk.bind((‘127.0.0.1‘,8080))
sk.listen()
while True:
    t = Thread(target=chat,args=(sk,))
    t.start()
sk.close

server端

  client 端

# client端
import socket
sk = socket.socket()
sk.connect((‘127.0.0.1‘,8080))

msg = sk.recv(1024).decode(‘utf-8‘)
print(msg)
info = input(‘>>>‘).encode(‘utf-8‘)
sk.send(info)
sk.close()

client端

原文地址:https://www.cnblogs.com/aberwang/p/9460121.html

时间: 2024-10-05 03:02:56

python之路 -- 并发编程之线程的相关文章

百万年薪python之路 -- 并发编程之 多线程 二

1. 死锁现象与递归锁 进程也有死锁与递归锁,进程的死锁和递归锁与线程的死锁递归锁同理. 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因为争夺资源而造成的一种互相等待的现象,在无外力的作用下,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在相互等待的进程称为死锁进程 # 多个线程多个锁可能会产生死锁 from threading import Thread from threading import Lock import time lock_A = Lock

百万年薪python之路 -- 并发编程之 多线程 一

多线程 1.进程: 生产者消费者模型 一种编程思想,模型,设计模式,理论等等,都是交给你一种编程的方法,以后遇到类似的情况,套用即可 生产者与消费者模型的三要素: 生产者:产生数据的 消费者:接收数据做进一步处理的 容器: 缓存区(队列) 起到缓冲的作用,平衡生产力与消费者,解耦 2.线程的理论知识 什么是线程 一条流水线的工作流程 进程: 在内存中开启一个进程空间,然后将主进程的所有的资源复制一份,然后调用cpu去执行这些代码. 进程是资源调度的基本单位,而线程是cpu的最小执行单位 进程的开

百万年薪python之路 -- 并发编程之 多进程二

1. 僵尸进程和孤儿进程 基于unix的环境(linux,macOS) 主进程需要等待子进程结束之后,主进程才结束 主进程时刻检测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收. 为什么主进程不在子进程结束后马上对其回收呢? 主进程与子进程是异步关系,主进程无法马上捕获子进程什么时候结束 如果子进程结束之后马上在内存中释放资源,主进程就没有办法检测子进程的状态. Unix针对于上面的问题提供了一个机制 所有的子进程结束之后,立马会释放掉文件的操作链接和内存的大部分数据,但是会

Python中的并发编程

简介 我们将一个正在运行的程序称为进程.每个进程都有它自己的系统状态,包含内存状态.打开文件列表.追踪指令执行情况的程序指针以及一个保存局部变量的调用栈.通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程.在任何给定的时刻,一个程序只做一件事情. 一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen()).然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线

Java并发编程:线程的同步

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

Java并发编程:线程的创建

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

68:Scala并发编程原生线程Actor、Cass Class下的消息传递和偏函数实战解析及其在Spark中的应用源码解析

今天给大家带来的是王家林老师的scala编程讲座的第68讲:Scala并发编程原生线程Actor.Cass Class下的消息传递和偏函数实战解析 昨天讲了Actor的匿名Actor及消息传递,那么我们今天来看一下原生线程Actor及CassClass下的消息传递,让我们从代码出发: case class Person(name:String,age:Int)//定义cass Class class HelloActor extends Actor{//预定义一个Actor  def act()

19、Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对