SimpleThreadPool给线程池增加拒绝策略和停止方法

给线程池增加拒绝策略和停止方法

package com.dwz.concurrency.chapter13;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class SimpleThreadPool3 {
    private final int size;
    private final int queueSize;
    private final static int DEFAULT_SIZE = 10;
    private final static int DEFAULT_TASK_QUEUE_SIZE = 2000;
    private static volatile int seq = 0;
    private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
    private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
    private final DiscardPolicy discardPolicy;
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = ()->{
        throw new DiscardException("Discard this task.");
    };
    private volatile boolean destroy = false;

    public SimpleThreadPool3() {
        this(DEFAULT_SIZE, DEFAULT_TASK_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
    }

    public SimpleThreadPool3(int size, int queueSize, DiscardPolicy discardPolicy) {
        this.size = size;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        init();
    }

    private void init() {
        for (int i = 0; i < this.size; i++) {
            createWorkTask();
        }
    }

    public void submit(Runnable runnable) {
        if(destroy) {
            throw new IllegalStateException("The thread pool already destroy and not allow submit task.");
        }
        synchronized (TASK_QUEUE) {
            if(TASK_QUEUE.size() >= this.queueSize) {
                discardPolicy.discard();
            }
            TASK_QUEUE.addLast(runnable);
            TASK_QUEUE.notifyAll();
        }
    }

    private void createWorkTask() {
        WorkerTask task = new WorkerTask(GROUP, THREAD_PREFIX + (seq++));
        task.start();
        THREAD_QUEUE.add(task);
    }

    public void shutdown() throws InterruptedException {
        while (!TASK_QUEUE.isEmpty()) {
            Thread.sleep(50);
        }
        int initVal = THREAD_QUEUE.size();
        while (initVal > 0) {
            for(WorkerTask task : THREAD_QUEUE) {
                if(task.getTaskState() == TaskState.BLOCKED) {
                    task.interrupt();
                    task.close();
                    initVal--;
                } else {
                    Thread.sleep(10);
                }
            }
        }
        this.destroy = true;
        System.out.println("The thread pool disposed.");
    }

    public int getSize() {
        return size;
    }

    public int getQueueSize() {
        return queueSize;
    }

    public boolean destroy() {
        return destroy;
    }

    private enum TaskState {
        FREE, RUNNING, BLOCKED, DEAD
    }

    public static class DiscardException extends RuntimeException {

        public DiscardException(String message) {
            super(message);
        }
    }

    public interface DiscardPolicy{
        void discard() throws DiscardException;
    }

    private static class WorkerTask extends Thread {
        private volatile TaskState taskState = TaskState.FREE;

        public WorkerTask(ThreadGroup group, String name) {
            super(group, name);
        }

        public TaskState getTaskState() {
            return this.taskState;
        }

        @Override
        public void run() {
            OUTER:
            while (this.taskState != TaskState.DEAD) {
                Runnable runnable = null;
                synchronized (TASK_QUEUE) {
                    while (TASK_QUEUE.isEmpty()) {
                        try {
                            this.taskState = TaskState.BLOCKED;
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            // 线程被打断回到OUTER位置
                            break OUTER;
                        }
                    }
                    runnable = TASK_QUEUE.removeFirst();
                }

                if (runnable != null) {
                    System.out.println("runnable into...");
                    this.taskState = TaskState.RUNNING;
                    runnable.run();
                    this.taskState = TaskState.FREE;
                }
            }
        }

        public void close() {
            this.taskState = TaskState.DEAD;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SimpleThreadPool3 threadPool = new SimpleThreadPool3();
//        IntStream.range(0, 40).forEach(i -> {
//            threadPool.submit(() -> {
//                System.out.println("The runnable " + i + " be serviced by " + Thread.currentThread() + " start.");
//                try {
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                System.out.println("The runnable " + i + " be serviced by " + Thread.currentThread() + " finished.");
//            });
//        });
        for(int i = 0; i < 40; i++) {
            threadPool.submit(() -> {
                System.out.println("The runnable be serviced by " + Thread.currentThread() + " start.");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("The runnable be serviced by " + Thread.currentThread() + " finished.");
            });
        }

        Thread.sleep(10_000);
        threadPool.shutdown();
        threadPool.submit(()->{System.out.println("=======================");});
    }
}

原文地址:https://www.cnblogs.com/zheaven/p/12090464.html

时间: 2024-11-08 17:27:02

SimpleThreadPool给线程池增加拒绝策略和停止方法的相关文章

JDK线程池的拒绝策略

关于新疆服务请求未带入来话原因的问题 经核查,该问题是由于立单接口内部没有成功调用接续的 "更新来电原因接口"导致的,接续测更新来电原因接口编码:NGCCT_UPDATESRFLAG_PUT ,立单接口调用代码如下: final Map<String, Object> paramsMap = outputObject.getBean(); paramsMap.put("provCode", provCode); paramsMap.put("t

Java线程池的拒绝策略

一.简介 jdk1.5 版本新增了JUC并发编程包,大大的简化了传统的多线程开发.前面文章中介绍了线程池的使用,链接地址:https://www.cnblogs.com/eric-fang/p/9004020.html Java线程池,是典型的池化思想的产物,类似的还有数据库的连接池.redis的连接池等.池化思想,就是在初始的时候去申请资源,创建一批可使用的连接,这样在使用的时候,就不必再进行创建连接信息的开销了.举个生活中鲜明的例子,在去著名洋快餐某基或者某劳的时候,配餐人员是字节从一个中间

JAVA线程池的拒绝策略有哪几种?

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略: AbortPolicy:丢弃任务并抛出RejectedExecutionException异常. 这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态.如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现. DiscardPolicy:丢弃任务,但是不抛出异常. 如果线程队

Java - &quot;JUC线程池&quot; 线程状态与拒绝策略源码分析

Java多线程系列--"JUC线程池"04之 线程池原理(三) 本章介绍线程池的生命周期.在"Java多线程系列--"基础篇"01之 基本概念"中,我们介绍过,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态.线程池也有5种状态:然而,线程池不同于线程,线程池的5种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED. 线程池状态定义代码如下: private final AtomicI

java并发:线程池、饱和策略、定制、扩展

一.序言 当我们需要使用线程的时候,我们可以随时新建一个线程,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创建线程会降低系统的效率,因为创建和销毁线程均需要一定的时间.线程池可以使线程得到复用,所谓线程复用就是线程在执行完一个任务后并不被销毁,该线程可以继续执行其他的任务. 二.Executors提供的线程池 Executors是线程的工厂类,也可以说是一个线程池工具类,Executors提供的线程都是通过参数设置来实现不同的线程池机制.

线程池队列饱和策略

1.当一个有限队列充满后,线程池的饱和策略开始起作用. 2.ThreadPoolExecutor的饱和策略通过调用setRejectedExecutionHandler来修改.不同的饱和策略如下: 1)AbortPolicy:中止,executor抛出未检查RejectedExecutionException,调用者捕获这个异常,然后自己编写能满足自己需求的处理代码. 2)DiscardRunsPolicy:遗弃最旧的,选择丢弃的任务,是本应接下来就执行的任务. 3)DiscardPolicy:

线程池工作队列饱和策略

线程池工作队列饱和策略 Java线程池会将提交的任务先置于工作队列中,在从工作队列中获取(SynchronousQueue直接由生产者提交给工作线程). 那么工作队列就有两种实现策略:无界队列和有界队列. 无界队列不存在饱和的问题,但是其问题是当请求持续高负载的话,任务会无脑的加入工作队列,那么很可能导致内存等资源溢出或者耗尽. 而有界队列不会带来高负载导致的内存耗尽的问题,但是有引发工作队列已满情况下,新提交的任务如何管理的难题,这就是线程池工作队列饱和策略要解决的问题. 无界队列不存在饱和的

ScheduledExecutorService调度线程池运行几次后停止某一个线程

开发中偶尔会碰到一些轮询需求,比如我碰到的和银行对接,在做完某一个业务后银行没有同步给到结果,这时候就需要查询返回结果,我们的需求是5分钟一次,查询3次,3次过后如果没有结果则T+1等银行的文件,对于这种任务我们的要求是轮询不是很严格,所以我采取调度线程池方式,如果有查询任务,加入线程池,设置好执行次数及执行时间间隔,具体代码如下: 1 import org.junit.Test; 2 import org.slf4j.Logger; 3 import org.slf4j.LoggerFacto

线程池的生命周期和拒绝策略

线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态.线程池也有5种状态:然而,线程池不同于线程,线程池的5种状态是:RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED. 线程池状态定义代码如下: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3