并发编程---线程queue---进程池线程池---异部调用(回调机制)

线程

  • 队列:先进先出
  • 堆栈:后进先出
  • 优先级:数字越小优先级越大,越先输出

import queue

q = queue.Queue(3) # 先进先出-->队列

q.put(‘first‘)
q.put(2)
# q.put(‘third‘)
# q.put(4) #由于没有人取走,就会卡主
q.put(4,block=False)  #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了
# # q.put(4,block=True,timeout=3)

print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True,timeout=3)) # 阻塞等待3秒 没有取走数据就报异常
# print(q.get(block=False)) #等同于q.get_nowait()
# print(q.get_nowait())

q = queue.LifoQueue(3) #后进先出-->堆栈
q.put(‘first‘)
q.put(2)
q.put(‘third‘)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
打印结果:
third
2
first
‘‘‘

q = queue.PriorityQueue(3) #优先级队列

q.put((10,‘one‘))
q.put((40,‘two‘))
q.put((30,‘three‘))

print(q.get())
print(q.get())
print(q.get())
‘‘‘
数字越小优先级越高
打印结果
(10, ‘one‘)
(30, ‘three‘)
(40, ‘two‘)
‘‘‘

线程queue

进程池线程池

  • 池:是用来对进程(线程)的数量加以限制
  • 进程池:计算密集型,用多进程
  • 线程池:IO密集型,用多线程,例如:sockect网络通信就应该用多线程

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random

‘‘‘
sockect网络通信是IO操作,所以用多线程
计算密集型:用多进程
‘‘‘

def task(name):
    print(‘name:%s pid:%s run‘ %(name,os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == ‘__main__‘:
    # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task,‘yang%s‘ %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行

    pool.shutdown(wait=True) # 类似join  代表往池子里面丢任务的入口关掉 计数器-1
    print(‘主‘)
‘‘‘
打印结果:
name:yang0 pid:11120 run
name:yang1 pid:11120 run
name:yang2 pid:11120 run
name:yang3 pid:11120 run
name:yang4 pid:11120 run

name:yang5 pid:11120 run
name:yang6 pid:11120 run
name:yang7 pid:11120 run

name:yang8 pid:11120 run
name:yang9 pid:11120 run
主
‘‘‘

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
import os,time,random

def task():
    print(‘name:%s pid:%s run‘ %(currentThread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == ‘__main__‘:
    # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行

    pool.shutdown(wait=True) # 类似join  代表往池子里面丢任务的入口关掉 计数器-1
    print(‘主‘)
‘‘‘
打印结果:
name:ThreadPoolExecutor-0_0 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_0 pid:14052 run
主
‘‘‘

进程池|线程池

同步调用和异步调用

提交任务的两种方式:

  • 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
  • 异步调用:提交完任务后,不在原地等待任务执行完。回调机制:自动触发

#1.同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(‘%s is laing‘ %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*‘#‘
    return {‘name‘:name,‘res‘:res}

def weigh(shit):
    name = shit[‘name‘]
    size = len(shit[‘res‘])
    print(‘%s 拉了 <%s>kg‘ %(name,size))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,‘alex‘).result()
    weigh(shit1)

    shit2 = pool.submit(la,‘yang‘).result()
    weigh(shit2)

    shit3 = pool.submit(la,‘hang‘).result()
    weigh(shit3)
‘‘‘
打印结果:
alex is laing
alex 拉了 <8>kg
yang is laing
yang 拉了 <8>kg
hang is laing
hang 拉了 <7>kg
‘‘‘

同步调用

#2.异步调用:提交完任务后,不在原地等待任务执行完
from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(‘%s is laing‘ %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*‘#‘
    return {‘name‘:name,‘res‘:res}
    # weigh({‘name‘:name,‘res‘:res})  # 这样写,所有功能 不能体现出解耦合

def weigh(shit):
    shit = shit.result() # 拿到是一个对象,需要进行result()
    name = shit[‘name‘]
    size = len(shit[‘res‘])
    print(‘%s 拉了 <%s>kg‘ %(name,size))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,‘alex‘).add_done_callback(weigh)

    shit2 = pool.submit(la,‘yang‘).add_done_callback(weigh)

    shit3 = pool.submit(la,‘hang‘).add_done_callback(weigh)
‘‘‘
打印结果:
alex is laing
yang is laing
hang is laing
hang 拉了 <10>kg
alex 拉了 <7>kg
yang 拉了 <12>kg
‘‘‘

异步调用

异步调用的应用

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print(‘GET %s‘%url)
    response = requests.get(url)
    time.sleep(3)
    return {‘url‘:url,‘content‘:response.text}

def parse(res):
    res = res.result()
    print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘])))

if __name__ == ‘__main__‘:
    urls = [
        ‘http://www.cnblogs.com/linhaifeng‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
    ]

    pool = ThreadPoolExecutor(2)
    for url in urls:
        pool.submit(get,url).add_done_callback(parse)
‘‘‘
打印结果:
GET http://www.cnblogs.com/linhaifeng
GET https://www.python.org
http://www.cnblogs.com/linhaifeng parse res is 16320
GET https://www.openstack.org
https://www.python.org parse res is 49273
https://www.openstack.org parse res is 64040
‘‘‘

应用

原文地址:https://www.cnblogs.com/Mryang123/p/8921962.html

时间: 2024-11-05 16:35:08

并发编程---线程queue---进程池线程池---异部调用(回调机制)的相关文章

《Java并发编程实战》第八章 线程池的使用 读书笔记

一.在任务与执行策略之间的隐性解耦 有些类型的任务需要明确地指定执行策略,包括: . 依赖性任务.依赖关系对执行策略造成约束,需要注意活跃性问题.要求线程池足够大,确保任务都能放入. . 使用线程封闭机制的任务.需要串行执行. . 对响应时间敏感的任务. . 使用ThreadLocal的任务. 1. 线程饥饿死锁 线程池中如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁. 2. 运行时间较长的任务 Java提供了限时版本与无限时版本.例如Thread

C++windows内核编程笔记day13 进程、线程与信号量

Windows进程 进程是一个容器,包含程序执行需要的代码.数据.资源等信息, windows进程的特点: 每个进程都有自己的ID号 每个进程都有自己的地址空间,进程之间无法访问对方的地址空间. 每个进程都有自己的安全属性 每个进程至少包含一个线程. 获取和释放环境信息 GetEnvironmentStrings FreeEnvironmentStrings 获取或设置 本程序的环境变量 GetEnvironmentVariable SetEnvironmentVariable 示例: char

读书笔记-----Java并发编程实战(一)线程安全性

线程安全类:在线程安全类中封装了必要的同步机制,客户端无须进一步采取同步措施 示例:一个无状态的Servlet 1 @ThreadSafe 2 public class StatelessFactorizer implements Servlet{ 3 public void service(ServletRequest req,ServletResponse resp){ 4 BigInteger i = extractFromRequest(req); 5 BigInteger[] fact

《Java并发编程实战》第二章 线程安全性 读书笔记

一.什么是线程安全性 编写线程安全的代码 核心在于要对状态访问操作进行管理. 共享,可变的状态的访问 - 前者表示多个线程访问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与其规范完全一致. 多个线程同时操作共享的变量,造成线程安全性问题. * 编写线程安全性代码的三种方法: 不在线程之间共享该状态变量 将状态变量修改为不可变的变量 在访问状态变量时使用同步 Java同步机制工具: synchronized volatile类型变量 显示锁(Explicit Lock

C#网络编程基础之进程和线程详解

在C#的网络编程中,进程和线程是必备的基础知识,同时也是一个重点,所以我们要好好的掌握一下. 一:概念 首先我们要知道什么是"进程",什么是"线程",好,查一下baike. 进程:是一个具有一定独立功能的程序关于某个数据集合的一次活动.它是操作系统动态执行的基本单元, 在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元. 线程:是"进程"中某个单一顺序的控制流. 关于这两个概念,大家稍微有个印象就行了,防止以后被面试官问到. 二:进程

python 使用多进程实现并发编程/使用queue进行进程间数据交换

import time import os import multiprocessing from multiprocessing import Queue, pool """ 一.Python 使用多进程实现并发编程: 因为cpython解释器中有GIL存在的原因(每个进程都会维护一个GIL,jpython解释器没有这个问题),所以在一个进程内, 即使服务器是多核cpu,同一时刻只能有一个线程在执行任务(一个进程内).如果存在较多IO,使用多线程是可以提高处理速度的, 但是

适用于即时系统并发编程的新的java线程模型,记我的第一篇英文翻译

1:介绍:      传统意义上的即时系统是有经验的专家的领域,因为他们能处理多种定制的内核,非标准的并且大多数是低级的语言,供应商提供的定制的I/O接口.这就要求有一种新的java线程模型来解决这种状况,这种模型解决当前及时嵌入系统的四个缺陷:安全,性能,可移植性.程序调试时间.安全是当前编程语言和及时系统在复杂性与定义不清的接口上折中的办法,这些语法不能成为正式的保证系统安全的语法:性能受到威胁是因为工程师必须接受及时系统所提供的无论什么级别的操作,如果那些操作过高或过低都会导致非必要的日常

并发,并行,进程,线程,同步,异步

一个应用程序至少有一个进程,一个进程至少有一个线程. 并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. 并发当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状..这种方式我们称之为并发(Concurrent). 并行:当系统有一

并发编程(八):线程安全策略

通常我们保证线程安全策略的方式有以下几种: a.不可变对象 b.线程封闭 c.同步容器 d.并发容器 不可变对象 可参考string类,可以采用的方式是将类声明为final,将所有成员都声明为私有的,对变量不提供set方法,将所有可变成员声明为final,通过构造器初始化所有成员,进行深度拷贝,在get方法中不直接返回对象本身,而是返回对象的拷贝. 关于final,我们详细说明一下 final-demo @Slf4j public class ImmutableExample1 { privat