python线程池(转)

ThreadPool:

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == ‘__main__‘:
    main()

  

Futures:

#! /usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent.futures import ThreadPoolExecutor
import time

def sayhello(a):
    print("hello: "+a)
    time.sleep(1)

def main():
    seed=["a","b","c","d","e","f"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(6) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))
    start3=time.time()
    with ThreadPoolExecutor(6) as executor1:
        executor1.map(sayhello,seed)
    end3=time.time()
    print("time3: "+str(end3-start3))

if __name__ == ‘__main__‘:
    main()

重构 (在线程池运行时向里面添加新任务)

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite

class ThreadPool(object):
    def __init__(self, thread_num, args):

        self.args = args
        self.work_queue = Queue.Queue()
        self.save_queue = Queue.Queue()
        self.threads = []
        self.running = 0
        self.failure = 0
        self.success = 0
        self.tasks = {}
        self.thread_name = threading.current_thread().getName()
        self.__init_thread_pool(thread_num)

    # 线程池初始化
    def __init_thread_pool(self, thread_num):
        # 下载线程
        for i in range(thread_num):
            self.threads.append(WorkThread(self))
        # 打印进度信息线程
        self.threads.append(PrintProgress(self))
        # 保存线程
        self.threads.append(SaveToSqlite(self, self.args.dbfile))

    # 添加下载任务
    def add_task(self, func, url, deep):
        # 记录任务,判断是否已经下载过
        url_hash = hashlib.new(‘md5‘, url.encode("utf8")).hexdigest()
        if not url_hash in self.tasks:
            self.tasks[url_hash] = url
            self.work_queue.put((func, url, deep))
            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

    # 获取下载任务
    def get_task(self):
        # 从队列里取元素,如果block=True,则一直阻塞到有可用元素为止。
        task = self.work_queue.get(block=False)

        return task

    def task_done(self):
        # 表示队列中的某个元素已经执行完毕。
        self.work_queue.task_done()

    # 开始任务
    def start_task(self):
        for item in self.threads:
            item.start()

        logging.debug("Work start")

    def increase_success(self):
        self.success += 1

    def increase_failure(self):
        self.failure += 1

    def increase_running(self):
        self.running += 1

    def decrease_running(self):
        self.running -= 1

    def get_running(self):
        return self.running

    # 打印执行信息
    def get_progress_info(self):
        progress_info = {}
        progress_info[‘work_queue_number‘] = self.work_queue.qsize()
        progress_info[‘tasks_number‘] = len(self.tasks)
        progress_info[‘save_queue_number‘] = self.save_queue.qsize()
        progress_info[‘success‘] = self.success
        progress_info[‘failure‘] = self.failure

        return progress_info

    def add_save_task(self, url, html):
        self.save_queue.put((url, html))

    def get_save_task(self):
        save_task = self.save_queue.get(block=False)

        return save_task

    def wait_all_complete(self):
        for item in self.threads:
            if item.isAlive():
                # join函数的意义,只有当前执行join函数的线程结束,程序才能接着执行下去
                item.join()

# WorkThread 继承自threading.Thread
class WorkThread(threading.Thread):
    # 这里的thread_pool就是上面的ThreadPool类
    def __init__(self, thread_pool):
        threading.Thread.__init__(self)
        self.thread_pool = thread_pool

    #定义线程功能方法,即,当thread_1,...,thread_n,调用start()之后,执行的操作。
    def run(self):
        print (threading.current_thread().getName())
        while True:
            try:
                # get_task()获取从工作队列里获取当前正在下载的线程,格式为func,url,deep
                do, url, deep = self.thread_pool.get_task()
                self.thread_pool.increase_running()

                # 判断deep,是否获取新的链接
                flag_get_new_link = True
                if deep >= self.thread_pool.args.deep:
                    flag_get_new_link = False

                # 此处do为工作队列传过来的func,返回值为一个页面内容和这个页面上所有的新链接
                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

                if html == ‘‘:
                    self.thread_pool.increase_failure()
                else:
                    self.thread_pool.increase_success()
                    # html添加到待保存队列
                    self.thread_pool.add_save_task(url, html)

                # 添加新任务,即,将新页面上的不重复的链接加入工作队列。
                if new_link:
                    for url in new_link:
                        self.thread_pool.add_task(do, url, deep + 1)

                self.thread_pool.decrease_running()
                # self.thread_pool.task_done()
            except Queue.Empty:
                if self.thread_pool.get_running() <= 0:
                    break
            except Exception, e:
                self.thread_pool.decrease_running()
                # print str(e)
                break

  

  

原文地址:https://www.cnblogs.com/luoye00/p/11888053.html

时间: 2024-10-02 22:47:09

python线程池(转)的相关文章

我对python线程池的理解

#!/usr/bin/env pythonfrom Queue import Queuefrom threading import Threadimport randomimport time def person(i,q):    while True:  #这个人一直处与可以接活干的状态        q.get()        print "Thread",i,"do_job"        time.sleep(random.randint(1,5))#每

python线程池

线程池: 版本一: #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread

Python 线程池

python默认没有提供线程池的功能,所以要想使用线程池,就必要使用第三方的模块或者自定义线程 线程并不是越多越好,线程的上下文切换会影响到服务器的性能 线程池:一个容器,有最大数,取一个少一个,无线程时等待,线程执行完毕,交还线程 __author__ = 'alex' #coding:utf-8 import queue import threading import time class ThreadPool: def __init__(self,maxsize=5): self.maxs

一个简单的python线程池框架

初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信. 代码如下: 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 __author__ = "pandaychen" 5 6 import Queue 7 import sys 8 import os 9 import threading 10 import time 11

小白成长之路:初识python(六) --python线程池

#!/usr/bin/env python# -*- coding:utf-8 -*-import threadingimport queueimport time"""对照着武老师的课程自己跟着做了一个线程池,主要的思路就是把要执行的任务放进队列中然后创建若干个线程不断地从队列中获取任务并执行相对比low B 版的线程池有很大改进,姑且叫low A版吧...""" Stop_Flag = object() class ThreadPool(ob

python 线程池使用

传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态. 一个线程的运行时间可以分为3部分:线程的启动时间.线程体的运行时间和线程的销毁时间.在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动.销毁和运行3个过程.这必然会增加系统相应的时间,降低了效率. 使用线程池:由于线程预先被创建并放入线程池中,同时处理完

Python线程池任务

#!/usr/bin/env python # -*- coding:utf-8 -*- from concurrent.futures import ThreadPoolExecutor #线程池,导入模块 import time def task(hostname): ''' 假设这里是要提交的任务 :param hostname: :return: ''' time.sleep(2) print(hostname) #创建线程池线程数自定义,要看主机性能,例如创建20个线程 pool =

[python] ThreadPoolExecutor线程池 python 线程池

初识 Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行. 这就是线程池的思想(当然没这么简单),但是自己编

一个简单缩略版的python 线程池实现

1 #-*-coding:utf-8-*-2 2 3 import threading 4 import queue 5 import itertools 6 import os 7 import time 8 9 10 RUN = 0 11 CLOSE = 1 12 TERMINATE = 2 13 job_counter = itertools.count() 14 15 16 class Pool(object): 17 18 def __init__(self, max_thread_n