一个简单的python线程池框架

  初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信.

代码如下:

  

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3
  4 __author__ = "pandaychen"
  5
  6 import Queue
  7 import sys
  8 import os
  9 import threading
 10 import time
 11 import signal
 12
 13 def handler():
 14     print "press CTRL+C to end...."
 15     sys.exit(1)
 16
 17
 18 def call_function(para):
 19     time.sleep(5)
 20     return para
 21
 22
 23 def LoggingFun(t_filename,t_logcontent):
 24     logpath = ‘./log/‘
 25     curdate = time.strftime("%Y%m%d")
 26     newpath = ‘./log/‘+t_filename+‘_‘+curdate
 27
 28     if os.path.exists(logpath):
 29         pass
 30     else:
 31         os.mkdir(logpath)
 32
 33     try:
 34         filehd = open(newpath,‘a+‘)
 35         newcontent = ‘[‘+str(time.strftime("%Y-%m-%d %H:%M:%S"))+‘]‘+t_logcontent+‘\n‘
 36         filehd.writelines(newcontent)
 37         filehd.close()
 38     except Exception,e:
 39         pass
 40
 41 class LogThread(threading.Thread):
 42     def __init__(self,logQueue,**kwds):
 43         threading.Thread.__init__(self,**kwds)
 44         self.logQueue = logQueue
 45         self.setDaemon(True)
 46
 47     def run(self):
 48         while 1:
 49             #log = self.logQueue.get(False)
 50             log = self.logQueue.get()
 51             if log:
 52                 LoggingFun("test",log)
 53                 pass
 54             else:
 55                 LoggingFun("test","log thread sleep 1s")
 56                 time.sleep(1)
 57
 58 #封装为一个线程类
 59 class Worker(threading.Thread):    # 处理工作请求
 60     def __init__(self, workQueue, resultQueue,logQueue, threadid,**kwds):
 61         threading.Thread.__init__(self, **kwds)
 62         self.setDaemon(True)
 63         self.workQueue = workQueue
 64         self.resultQueue = resultQueue
 65         self.logQueue = logQueue
 66         self.threadid = threadid
 67
 68     def run(self):
 69         while 1:
 70             try:
 71                 callable, args, kwds = self.workQueue.get(False)    # get a task
 72                 res = callable(*args, **kwds)
 73                 strres = "thread:"+ str(self.threadid) + " done,"+"args:"+str(res)
 74
 75                 self.logQueue.put(strres)
 76                 self.resultQueue.put(res)    # put result
 77             except Queue.Empty:
 78                 break
 79
 80 class WorkManagerPool:    # 线程池管理,创建
 81     def __init__(self, num_of_workers=10):
 82         self.workQueue = Queue.Queue()    # 请求队列
 83         self.resultQueue = Queue.Queue()    # 输出结果的队列
 84         self.logQueue = Queue.Queue()
 85         self.workers = []
 86         self._recruitThreads(num_of_workers)
 87
 88     def _recruitThreads(self, num_of_workers):
 89         for i in range(num_of_workers):
 90             worker = Worker(self.workQueue, self.resultQueue,self.logQueue,i)    # 创建工作线程
 91             worker.setDaemon(True)
 92             self.workers.append(worker)    # 加入到线程队列
 93
 94         logthread = LogThread(self.logQueue)
 95         self.workers.append(logthread)
 96
 97
 98     def start(self):
 99         for w in self.workers:
100             w.start()
101
102     def wait_for_complete(self):
103         while len(self.workers):
104             worker = self.workers.pop()    # 从池中取出一个线程处理请求
105             worker.join()
106             if worker.isAlive() and not self.workQueue.empty():
107                 self.workers.append(worker)    # 重新加入线程池中
108                 print ‘All jobs were complete.‘
109
110
111     def add_job(self, callable, *args, **kwds):
112         self.workQueue.put((callable, args, kwds))    # 向工作队列中加入请求
113
114     def get_result(self, *args, **kwds):
115         return self.resultQueue.get(*args, **kwds)
116
117
118
119 def main():
120     signal.signal(signal.SIGINT, handler)
121     signal.signal(signal.SIGTERM, handler)
122
123     try:
124         num_of_threads = int(sys.argv[1])
125     except:
126         num_of_threads = 10
127         start = time.time()
128         workermanagepool = WorkManagerPool(num_of_threads)
129         #print num_of_threads
130         urls = [‘http://bbs.qcloud.com‘] * 1000
131         for i in urls:
132             workermanagepool.add_job(call_function, i)
133
134         workermanagepool.start()
135         workermanagepool.wait_for_complete()
136         print time.time() - start
137
138 if __name__ == ‘__main__‘:
139     main()
时间: 2024-08-24 11:56:00

一个简单的python线程池框架的相关文章

LINUX c++线程池框架

版权声明:原文地址及作者不详,如有侵权,请联系: 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单. 为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,

【JUC】JUC线程池框架综述

一.前言 在分析完了JUC的锁和集合框架后,下面进入JUC线程池框架的分析,下面给出JUC线程池的总体框架,之后再逐一进行分析. 二.JUC线程池框架图 说明:从上图可知,JUC线程池框架中的其他接口或类都直接或间接的继承了Executor接口,虽然Executors与其他类或者接口没有明显的关系,但是Executors是线程池的工具类,利用它可以生成各种线程池. 三.具体说明 3.1 Executors Executors是一个工具类,用其可以创建ExecutorService.Schedul

Executor线程池框架

Executor线程池框架 new Thread()的缺点 每次new Thread()耗费性能 调用new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪. 不利于扩展,比如如定时执行.定期执行.线程中断 采用线程池的优点 重用存在的线程,减少对象创建.消亡的开销,性能佳 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞 提供定时执行.定期执行.单线程.并发数控制等功能 Executor的介绍

基础线程机制--Executor线程池框架

基础线程机制 Executor线程池框架 1.引入Executor的原因 (1)new Thread()的缺点 ???每次new Thread()耗费性能 ???调用new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限制的创建,之间相互竞争,导致过多的系统资源被占用导致系统瘫痪,不利于定时执行,定期执行,线程中断. (2)采用线程池的优点 ???可以重用创建的线程,减少对象的创建,消亡的开销,性能更佳. ???可以有效的控制最大并发线程数,提高系统资源的利用率,避免过多的资源竞

多线程--Executor线程池框架

Executor的介绍 在Java 5之后,并发编程引入了一堆新的启动.调度和管理线程的API.其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭,可以简化并发编程的操作.因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始

线程池? 如何设计一个动态大小的线程池,有哪些方法?

[线程池?  如何设计一个动态大小的线程池,有哪些方法?] 线程池:顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中, 需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中, 从而减少创建和销毁线程对象的开销. 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.此时,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池相似,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable

作业1开发一个简单的python计算器

开发一个简单的python计算器 实现加减乘除及拓号优先级解析 用户输入 1 - 2 * ( (60-30 +(-40/5) * (9-2*5/3 + 7 /3*99/4*2998 +10 * 568/14 )) - (-4*3)/ (16-3*2) )等类似公式后,必须自己解析里面的(),+,-,*,/符号和公式(不能调用eval等类似功能偷懒实现),运算后得出结果,结果必须与真实的计算器所得出的结果一致 hint: re.search(r'\([^()]+\)',s).group() '(-

一个简单好用的日志框架NLog

之前我介绍过如何使用log4net来记录日志,但最近喜欢上了另一个简单好用的日志框架NLog. 关于NLog和log4net的比较这里就不多讨论了,感兴趣的朋友可以参看.NET日志工具介绍和log4net vs. Nlog这两篇文章.本文主要介绍一下如何在项目中使用NLog. 在Nuget中安装NLog NLog可以直接使用Nuget安装: PM > Install-Package Nlog 使用NLog NLog的使用方式基本上和其它的Log库差不多,分为Trace.Debug.Info.Er

我对python线程池的理解

#!/usr/bin/env pythonfrom Queue import Queuefrom threading import Threadimport randomimport time def person(i,q):    while True:  #这个人一直处与可以接活干的状态        q.get()        print "Thread",i,"do_job"        time.sleep(random.randint(1,5))#每