初学线程池--1,自己实现一个线程池

自己实现一个简单的线程池

public interface ThreadPool<Job extends Runnable> {
    // 启动
    void execute(Job job);

    // 关闭
    void shutDown();

    // 增加线程
    void addWorkThread(int num);

    // 减少线程
    void reduceWorkThread(int num) throws Exception;

    // 正在执行的线程数
    int getSize();

}

实现类

public class MyThreadPoll<Job extends Runnable> implements ThreadPool<Job> {
    /**
     * 最大线程数
     */
    private static final int MAX_WORK_THREAD = 10;
    /**
     * 最小线程数
     */
    private static final int MIN_WORK_THREAD = 1;
    /**
     * 默认的线程数
     */
    private static final int DEFAULT_WORK_THREAD = 5;

    /**
     * 工作列表(无序)
     */
    private final LinkedList<Job> jobQueue = new LinkedList<>();

    /**
     * 工作者线程
     */
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());

    /**
     * 工作线程数
     */
    private int workerNum = DEFAULT_WORK_THREAD;

    /**
     * 线程编号
     */
    private AtomicLong threadNum = new AtomicLong();

    public MyThreadPoll() {
        initWorks(DEFAULT_WORK_THREAD);
    }

    public MyThreadPoll(int num) {
        if (num > MAX_WORK_THREAD) {
            workerNum = MAX_WORK_THREAD;
        } else if (workerNum < MIN_WORK_THREAD) {
            workerNum = MIN_WORK_THREAD;
        } else {
            workerNum = num;
        }
        initWorks(workerNum);
    }

    /**
     * 初始化工作线程
     *
     * @param num
     */
    private void initWorks(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread =
                    new Thread(worker, "THPOLL-WORKER-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    @Override
    public void execute(Job job) {

        if (job != null) {
            synchronized (jobQueue) {
                // 加入工作线程队列
                jobQueue.add(job);

                // 尝试唤醒线程
                jobQueue.notify();
            }

        }
    }

    @Override
    public void shutDown() {
        // 线程关闭循环
        for (Worker worker : workers) {
            worker.shutDown();
        }
        // 全部唤醒
        synchronized (jobQueue) {
            jobQueue.notifyAll();
        }
    }

    @Override
    public void addWorkThread(int num) {
        synchronized (jobQueue) {
            if (num + this.workerNum > MAX_WORK_THREAD) {
                num = MAX_WORK_THREAD - this.workerNum;
            }
            initWorks(num);
            this.workerNum += num;
        }
    }

    @Override
    public void reduceWorkThread(int num) throws Exception {
        synchronized (jobQueue) {
            if (num >= this.workerNum) {
                throw new Exception();
            }
            int count = num;
            int succCount = 0;
            while (count > 0) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutDown();
                    count--;
                    succCount++;
                }
            }
            this.workerNum -= succCount;
        }

    }

    @Override
    public int getSize() {
        return jobQueue.size();
    }

    private class Worker implements Runnable {

        private volatile boolean running = true;

        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobQueue) {
                    while (jobQueue.isEmpty() && running) {
                        try {
                            jobQueue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    if (!jobQueue.isEmpty()) {
                        job = jobQueue.removeFirst();
                    }
                }
                // 如果此时线程池已经被关闭,则忽略所有任务
                // 现实情况可能有其他操作
                if (job != null && running) {
                    try {
                        job.run();
                        System.out.println("JOB=" + Thread.currentThread().getName());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void shutDown() {
            running = false;
        }
    }
}
时间: 2024-08-01 16:10:07

初学线程池--1,自己实现一个线程池的相关文章

一个线程加一运算,一个线程做减一运算,多个线程同时交替运行--synchronized

使用synchronized package com.pb.thread.demo5; /**使用synchronized * 一个线程加一运算,一个线程做减法运算,多个线程同时交替运行 * * @author Denny * */ public class Count { private int num = 0; private boolean flag = false; // 标识 //加法 public synchronized void add() { while (flag) { tr

易语言怎样写双线程?一个线程循环找图。一个线程循环按键F2。

易语言怎样写双线程? 一个线程循环找图.一个线程循环按键F2. // .程序集变量 参数, 整数型 .程序集变量 线程句柄1, 整数型 .程序集变量 线程句柄2, 整数型 启动线程 (&子程序1, 参数,线程句柄1) 启动线程 (&子程序2, ,线程句柄2) // .子程序 子程序1 .参数 参数1, 整数型 信息框 (参数1, 0, ) 信息框 (“这是线程1的例子”, 0, ) // .子程序 子程序2 信息框 (“这是线程2的例子”, 0, ) // // 注意: 凡调用到COM接口

写两个线程,一个线程打印1-52,另一个线程打印A-Z,打印顺序为12A34B56C......5152Z

题目: 写两个线程,一个线程打印1-52,另一个线程打印A-Z,打印顺序为12A34B56C......5152Z.要求用线程间的通信. /** * 写两个线程,第一个线程打印1-52,第二个线程打印A-Z,打印结果为12A34B...5152Z */public class ThreadPrinter { // true打印数字,false打印字母 private boolean flag = true; // 打印字母 public synchronized void printNumber

理解线程池,自己实现一个线程池

线程池本质是一个生产者-消费者模式,一边维护一些线程执行任务,一边由主线程添加一些任务.现在我们抛弃源码中一些繁杂的状态判断,自己写一个线程池. public class poolT { //可能频繁增删任务,链表队列效率较高 private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers

写2个线程,其中一个线程打印1~52,另一个线程打印A~z,打印顺序应该是12A34B45C……5152Z

我写的 class LN { private int flag = 0; public static char ch = 'A'; public static int n = 1; public synchronized void printLetter() { try { if(flag == 0 || flag == 1) { wait(); } else { //System.out.println("flag = " + flag); System.out.print(ch);

编写一个多线程函数实现对数组排序,要求: 1.至少用两个线程 2.数组的元素值可以事先定义好,或者可以从键盘输入(增加一个线程)。 3.用一个线程对数组排序,用另一个线程输出排序结果。 4.保证先排好序,再输出。

#include"stdio.h" #include"pthread.h" #include"semaphore.h" static int datbuf[10] = {0}; static int n; sem_t sem1,sem2,sem3; void *do_input(void *pvoid) { int i; sem_wait(&sem1); printf("please input data\n"); f

线程池? 如何设计一个动态大小的线程池,有哪些方法?

[线程池?  如何设计一个动态大小的线程池,有哪些方法?] 线程池:顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中, 需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中, 从而减少创建和销毁线程对象的开销. 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.此时,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池相似,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

死磕 java线程系列之自己动手写一个线程池

欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池. 属性分析 线程池,顾名思义它首先是一个"池",这个池里面放的是线程,线程是用来执行任务的. 首先,线程池中的线程应该是有类别的,有的是核心线程,有

Java线程池主线程等待子线程执行完成

今天讨论一个入门级的话题, 不然没东西更新对不起空间和域名~~ 工作总往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总的产景, 这时候就需要使用线程了. 一个线程启动之后, 是异步的去执行需要执行的内容的, 不会影响主线程的流程,  往往需要让主线程指定后, 等待子线程的完成. 这里有几种方式. 站在 主线程的角度, 我们可以分为主动式和被动式. 主动式指主线主动去检测某个标志位, 判断子线程是否已经完成. 被动式指主线程被动的等待子线程的结束, 很明