自定义线程池的几种方案

线程池:

方案简介:

方案一:简单版本的线程池,每次都要创建线程池;

方案二:支持传函数、传参、传回调函数、立即终止所有线程、最大优点:线程的循环利用,节省时间和资源  ★★★★★

方案三:现有模块,直接调用即可,不支持回调函数

方案一:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
 
 
class ThreadPool(object):
 
    def __init__(self, max_num=20):
        self.queue = Queue.Queue(max_num)
        for i in xrange(max_num):
            self.queue.put(threading.Thread)
 
    def get_thread(self):
        return self.queue.get()
 
    def add_thread(self):
        self.queue.put(threading.Thread)
 
"""
pool = ThreadPool(10)
 
def func(arg, p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()
 
 
for i in xrange(30):
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))
    t.start()
"""

 方案二:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            if self.terminal: # False
                event = StopEvent
            else:
                with self.worker_state(self.free_list,current_thread):
                    event = self.q.get()

        else:
            self.generate_list.remove(current_thread)

    @contextlib.contextmanager
    def worker_state(self,x,v):
        x.append(v)
        try:
            yield
        finally:
            x.remove(v)

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

    # 终止线程(清空队列)
    def terminate(self):

        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()

import time

def work(i):
    time.sleep(1)
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))

# pool.terminate() #立即终止所有线程

方案三、

from concurrent.futures import ThreadPoolExecutor
import time

def f1(a):
    time.sleep(2)
    print(a)
    return 1

pool=ThreadPoolExecutor(5)
for i in range(30):
    a=pool.submit(f1,i)
    # x=a.result()#获取返回值,如果有,会阻塞
时间: 2024-10-03 13:40:00

自定义线程池的几种方案的相关文章

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro

Java自定义线程池详解

自定义线程池的核心:ThreadPoolExecutor 为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制,其中在java.util.concurrent包下,是JDK并发包的核心,比如我们熟知的Executors.Executors扮演着线程工厂的角色,我们通过它可以创建特定功能的线程池,而这些线程池背后的就是:ThreadPoolExecutor.那么下面我们来具体分析下它. 构造ThreadPoolExecutor public ThreadP

使用Lock(ReentrantLock)结合Condition实现自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用Lock(可重入锁ReentrantLock)结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码 /** * 任务

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

使用wait()和notifyAll()方法自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用的是wait()和notifyAll()方法,也可以使用Lock结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码

池化技术——自定义线程池

目录 池化技术--自定义线程池 1.为什么要使用线程池? 1.1.池化技术的特点: 1.2.线程池的好处: 1.3.如何自定义一个线程池 2.三大方法 2.1.单个线程的线程池方法 2.2.固定的线程池的大小的方法 2.3.可伸缩的线程池的方法 2.4.完整的测试代码为: 3.为什么要自定义线程池?三大方法创建线程池的弊端分析 4.七大参数 5.如何手动的去创建一个线程池 6.四种拒绝策略 6.1.会抛出异常的拒绝策略 6.2.哪来的去哪里拒绝策略 6.3.丢掉任务拒绝策略 6.4.尝试竞争拒绝

自定义线程池,如何最佳创建线程池

java有预置线程池:newSingleThreadExecutor,newFixedThreadPool,newCacheedThreadPool,newScheduledThreadPool,newWorkStealingPool.如果不适合,还可以使用ThreadPoolExecutor创建自定义线程池.主要构造方法: 1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveT

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

集合框架,ArrayList和Vector的区别,让arrayList线程安全的几种方案

boolean add(E e) 将指定的元素添加到此列表的尾部. void add(int index, E element) 将指定的元素插入此列表中的指定位置. boolean addAll(Collection<? extends E> c) 按照指定 collection 的迭代器所返回的元素顺序,将该 collection 中的所有元素添加到此列表的 尾部. boolean addAll(int index, Collection<? extends E> c) 从指定