使用wait()和notifyAll()方法自定义线程池

声明:

  1、该篇只是提供一种自定义线程池的实现方式,可能性能、安全等方面需要优化;

  2、该篇自定义线程池使用的是wait()和notifyAll()方法,也可以使用Lock结合Condition来实现;

  3、该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢。

概述

  自定义线程池三要素包括:

  1、存储线程的容器(或叫线程池)。该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现。

  2、执行线程(或叫执行器)。具体执行的线程。

  3、执行任务。执行线程需要执行的具体任务。

代码

  1、任务接口

/**
 * 任务接口
 *
 */
public interface Task {

    /**
     * 执行具体任务
     */
    void executeSpecificTask();

}

  2、执行线程接口

/**
 * 执行线程接口
 *
 */
public interface Executor {

    /**
     * 设置执行任务
     *
     * @param task 执行任务
     */
    void setTask(Task task);

    /**
     * 获取执行任务
     *
     * @return Task
     */
    Task getTask();

    /**
     * 启动该任务
     */
    void startTask();

}

  3、线程池接口  

/**
 * 线程池接口
 *
 */
public interface Pool {

    /**
     * 获取执行线程
     *
     * @return Executor
     */
    Executor getExecutor();

    /**
     * 销毁线程池容器中的所有执行线程
     */
    void destroy();

    /**
     * 获取线程池容器中执行线程数量
     *
     * @return int
     */
    int getPoolSize();

}

  4、自定义线程池具体实现

/**
 * 自定义线程池具体实现
 *
 */
public class ThreadPool implements Pool{

    //是否关闭线程池,true-关闭线程池,false-不关闭线程池
    private boolean isShut;

    //默认执行线程个数
    private final static int DEFAULT_THREAD_SIZE = 6;

    //执行线程个数,默认为DEFAULT_THREAD_SIZE大小
    private static int THREAD_SIZE = DEFAULT_THREAD_SIZE;

    //存放执行线程的容器
    private LinkedList<Executor> list = new LinkedList<Executor>();

    public ThreadPool(){
        this(DEFAULT_THREAD_SIZE);
    }

    public ThreadPool(int threadSize){
        isShut = false;
        THREAD_SIZE = threadSize > 0 ? threadSize : DEFAULT_THREAD_SIZE;
        for(int i = 0;i < THREAD_SIZE;i++){
            Executor executor = new ExecutorImpl();
            ((ExecutorImpl)executor).setName("执行器" + i);

            //将执行器加入容器中
            list.add(executor);

            ((ExecutorImpl)executor).start();//启动执行器
        }
    }

    @Override
    public Executor getExecutor() {
        Executor executor = null;
        synchronized (list) {
            if(list.size() > 0){
                executor = list.removeFirst();
            }else{
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executor = list.removeFirst();
            }
        }
        return executor;
    }

    @Override
    public void destroy() {
        synchronized (list) {
            isShut = true;
            list.notifyAll();
            list.clear();
        }
    }

    public  int getPoolSize(){
        synchronized (list) {
            return list.size();
        }
    }

    private class ExecutorImpl extends Thread implements Executor{

        //执行任务
        private Task task;

        //锁对象
        private final Object lock = new Object();

        public void run(){
            System.out.println(Thread.currentThread().getName() + " 启动>>>>>>>>>>");
            while(!isShut){
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() + " 等待<<<<<<<<<<");
                    try {
                        //初始化线程池时,等待通知并释放锁
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " 执行任务......");
                //执行具体任务
                getTask().executeSpecificTask();
                System.out.println(Thread.currentThread().getName() + " 执行任务完毕......");
                synchronized (list) {
                    System.out.println(Thread.currentThread().getName() + " 返回容器......");
                    //当前执行线程执行完毕后,返回到容器中
                    list.addFirst(ExecutorImpl.this);
                    //给别的执行线程发送通知
                    list.notifyAll();
                }
            }
        }

        public void startTask(){
            synchronized (lock) {
                lock.notify();
            }
        }

        @Override
        public void setTask(Task task) {
            this.task = task;
        }

        @Override
        public Task getTask() {
            return this.task;
        }

    }

}

  5、执行任务实现 

/**
 * 打印执行任务实现
 *
 */
public class PrintTask implements Task {

    @Override
    public void executeSpecificTask() {
        System.out.println("打印任务");
    }

}

  6、测试

public class Test {

    public static void main(String[] args) {

        Pool pool = new ThreadPool(2);

        for(int i = 0;i < 2;i++){
            Executor executor = pool.getExecutor();
            Task task = new PrintTask();
            executor.setTask(task);
            executor.startTask();
        }

        pool.destroy();

    }

}

  7、测试结果

执行器0 启动>>>>>>>>>>
执行器0 等待<<<<<<<<<<
执行器1 启动>>>>>>>>>>
执行器1 等待<<<<<<<<<<
执行器0 执行任务......
打印任务
执行器0 执行任务完毕......
执行器0 返回容器......
执行器1 执行任务......
打印任务
执行器1 执行任务完毕......
执行器1 返回容器......
时间: 2024-08-04 15:16:18

使用wait()和notifyAll()方法自定义线程池的相关文章

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro

Java自定义线程池详解

自定义线程池的核心:ThreadPoolExecutor 为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制,其中在java.util.concurrent包下,是JDK并发包的核心,比如我们熟知的Executors.Executors扮演着线程工厂的角色,我们通过它可以创建特定功能的线程池,而这些线程池背后的就是:ThreadPoolExecutor.那么下面我们来具体分析下它. 构造ThreadPoolExecutor public ThreadP

Android 自定义线程池的实战

前言:在上一篇文章中我们讲到了AsyncTask的基本使用.AsyncTask的封装.AsyncTask 的串行/并行线程队列.自定义线程池.线程池的快速创建方式. 对线程池不了解的同学可以先看 Android AsyncTask 深度理解.简单封装.任务队列分析.自定义线程池 ------------------------------------------------------------------------------------------------------- 1.Exec

Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

多线程一直是工作或面试过程中的高频知识点,今天给大家分享一下使用 ThreadPoolTaskExecutor 来自定义线程池和实现异步调用多线程. 一.ThreadPoolTaskExecutor 本文采用 Executors 的工厂方法进行配置. 1.将线程池用到的参数定义到配置文件中 在项目的 resources 目录下创建 executor.properties 文件,并添加如下配置: # 异步线程配置 # 核心线程数 async.executor.thread.core_pool_si

池化技术——自定义线程池

目录 池化技术--自定义线程池 1.为什么要使用线程池? 1.1.池化技术的特点: 1.2.线程池的好处: 1.3.如何自定义一个线程池 2.三大方法 2.1.单个线程的线程池方法 2.2.固定的线程池的大小的方法 2.3.可伸缩的线程池的方法 2.4.完整的测试代码为: 3.为什么要自定义线程池?三大方法创建线程池的弊端分析 4.七大参数 5.如何手动的去创建一个线程池 6.四种拒绝策略 6.1.会抛出异常的拒绝策略 6.2.哪来的去哪里拒绝策略 6.3.丢掉任务拒绝策略 6.4.尝试竞争拒绝

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

自定义线程池

线程池: 自定义线程池一: #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_th

JAVA并发,线程工厂及自定义线程池

1 package com.xt.thinks21_2; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.SynchronousQueue; 6 import java.util.concurrent.ThreadFactory; 7 import java.util.concurrent.ThreadPo