python多线程之threading、ThreadPoolExecutor.map

背景:

某个应用场景需要从数据库中取出几十万的数据时,需要对每个数据进行相应的操作。逐个数据处理过慢,于是考虑对数据进行分段线程处理:

  • 方法一:使用threading模块

代码:

 1 # -*- coding: utf-8 -*-
 2 import math
 3 import random
 4 import time
 5 from threading import Thread
 6
 7 _result_list = []
 8
 9
10 def split_df():
11     # 线程列表
12     thread_list = []
13     # 需要处理的数据
14     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
15     # 每个线程处理的数据大小
16     split_count = 2
17     # 需要的线程个数
18     times = math.ceil(len(_l) / split_count)
19     count = 0
20     for item in range(times):
21         _list = _l[count: count + split_count]
22         # 线程相关处理
23         thread = Thread(target=work, args=(item, _list,))
24         thread_list.append(thread)
25         # 在子线程中运行任务
26         thread.start()
27         count += split_count
28
29     # 线程同步,等待子线程结束任务,主线程再结束
30     for _item in thread_list:
31         _item.join()
32
33
34 def work(df, _list):
35     """
36     每个线程执行的任务,让程序随机sleep几秒
37     :param df:
38     :param _list:
39     :return:
40     """
41     sleep_time = random.randint(1, 5)
42     print(f‘count is {df},sleep {sleep_time},list is {_list}‘)
43     time.sleep(sleep_time)
44     _result_list.append(df)
45
46
47 if __name__ == ‘__main__‘:
48     split_df()
49     print(len(_result_list), _result_list)

测试结果:

  • 方法二:使用ThreadPoolExecutor.map

代码:

 1 # -*- coding: utf-8 -*-
 2 import math
 3 import random
 4 import time
 5 from concurrent.futures import ThreadPoolExecutor
 6
 7
 8 def split_list():
 9     # 线程列表
10     new_list = []
11     count_list = []
12     # 需要处理的数据
13     _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
14     # 每个线程处理的数据大小
15     split_count = 2
16     # 需要的线程个数
17     times = math.ceil(len(_l) / split_count)
18     count = 0
19     for item in range(times):
20         _list = _l[count: count + split_count]
21         new_list.append(_list)
22         count_list.append(count)
23         count += split_count
24     return new_list, count_list
25
26
27 def work(df, _list):
28     """ 线程执行的任务,让程序随机sleep几秒
29     :param df:
30     :param _list:
31     :return:
32     """
33     sleep_time = random.randint(1, 5)
34     print(f‘count is {df},sleep {sleep_time},list is {_list}‘)
35     time.sleep(sleep_time)
36     return sleep_time, df, _list
37
38
39 def use():
40     new_list, count_list = split_list()
41     with ThreadPoolExecutor(max_workers=len(count_list)) as t:
42         results = t.map(work, new_list, count_list)
43
44     # 或执行如下两行代码
45     # pool = ThreadPoolExecutor(max_workers=5)
46     # 使用map的优点是 每次调用回调函数的结果不用手动的放入结果list中
47     # results = pool.map(work, new_list, count_list)
48
49     # map返回一个迭代器,其中的回调函数的参数 最好是可以迭代的数据类型,如list;如果有 多个参数 则 多个参数的 数据长度相同;
50     # 如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]对应0 ;[3,4]对应1 ;其实内部执行的函数为 work([1,2],0) ; work([3,4],1)
51     # map返回的结果 是 有序结果;是根据迭代函数执行顺序返回的结果
52     print(type(results))
53     # 如下2行 会等待线程任务执行结束后 再执行其他代码
54     for ret in results:
55         print(ret)
56     print(‘thread execute end!‘)
57
58
59 if __name__ == ‘__main__‘:
60     use()

测试结果:

参考链接:https://www.cnblogs.com/rgcLOVEyaya/p/RGC_LOVE_YAYA_1103_3days.html

原文地址:https://www.cnblogs.com/sunshine-blog/p/12027606.html

时间: 2024-10-29 19:11:44

python多线程之threading、ThreadPoolExecutor.map的相关文章

Python多线程之threading Event

Python threading模块提供了Event对象用于线程间通信,它提供了设置.清除.等待等方法用于实现线程间的通信.event是最简单的进程间通信方式之一,一个线程产生一个信号,另一个线程则等待该信号.Python 通过threading.Event()产生一个event对象,event对象维护一个内部标志(标志初始值为False),通过set()将其置为True,wait(timeout)则用于阻塞线程直至Flag被set(或者超时,可选的),isSet()用于查询标志位是否为True

Python多线程之threading模块

使用threading.Thread类,有三种创建线程的方法: 创建一个Thread类,传给它一个函数: 创建一个Thread类,传给它一个可调用的类对象: 从Thread派生出一个类,创建一个这个子类的实例. # 方法1和方法2的创建方法类似 import threading def func(k):     print('thread %s replies %s'%(threading.currentThread().getName(), k**2)) if __name__ == '__m

Python 多线程之threading condition

使用Condition对象可以在某些事件触发或者达到特定的条件后才处理数据,Condition除了具有Lock对象的acquire方法和release方法外,还有wait方法.notify方法.notifyAll方法等用于条件处理. threading.Condition([lock]):创建一个condition,支持从外界引用一个Lock对象(适用于多个condtion共用一个Lock的情况),默认是创建一个新的Lock对象. acquire()/release():获得/释放 Lock w

python 线程之 threading(三)

python 线程之 threading(一)http://www.cnblogs.com/someoneHan/p/6204640.html python 线程之 threading(二)http://www.cnblogs.com/someoneHan/p/6209240.html 使用threading.Thread.is_alive()这个方法可以判断线程是否是存活状态.但是在现有的基础上不能够直到线程什么时候开始,什么时候结束,什么时候被打断. 如果有一个或者多个线程需要在另外的一个线

python 线程之 threading(四)

python 线程之 threading(三) http://www.cnblogs.com/someoneHan/p/6213100.html中对Event做了简单的介绍. 但是如果线程打算一遍一遍的重复通知某个事件.应该使用Condition 1. 使用Condition首先应该获取Condition即使Condition进入锁的状态 2. 在线程执行过程中需要等待其他线程通知,然后才开始向下运行的地方使用Condition.wait()方法,线程进入阻塞状态. 3. 使用Condition

python多线程之Condition(条件变量)

#!/usr/bin/env python # -*- coding: utf-8 -*- from threading import Thread, Condition import time items = [] condition = Condition() class Consumer(Thread): def __init__(self): Thread.__init__(self) def consume(self): global condition global items co

Python线程之threading

线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除.进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程,线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器.一组寄存器和栈). Threading模块提供线程相关的操作,Threading模块包含Thread,Lock,RLock,Event,Queue等组件

python 线程之threading(五)

在学习了Event和Condition两个线程同步工具之后还有一个我认为比较鸡肋的工具 semaphores 1. 使用semaphores的使用效果和Condition的notify方法的效果基本相同.每次只能通知一个阻塞线程继续运行 2. 信号量同步基于内部计数器,每调用一次acquire(),计数器减1:每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞 1 import threading 2 import time 3 4 def countdown

python多线程之t.setDaemon(True) 和 t.join()

0.目录 1.参考2.结论    (1)通过 t.setDaemon(True) 将子线程设置为守护进程(默认False),主线程结束后,守护子线程随之中止.    (2) t.join() 用于阻塞主线程,可以想象成将某个子线程的执行过程插入(join)到主线程的时间线上,主线程的后续代码延后执行.注意和 t.start() 分开写在两个for循环中.    (3)第一个for循环同时启动了所有子线程,随后在第二个for循环中执行t.join() ,主线程实际被阻塞的总时长==其中执行时间最长