重写ThreadFactory方法和拒绝策略

最近项目中要用到多线程处理任务,自然就用到了ThreadPoolTaskExecutor这个对象,这个是spring对于Java的concurrent包下的ThreadPoolExecutor类的封装,对于超出等待队列大小的任务默认是使用RejectedExecutionHandler去处理拒绝的任务,而这个Handler的默认策略是AbortPolicy,直接抛出RejectedExecutionException异常,这个不符合我们的业务场景,

业务需求:我希望是对于超出的任务,主线程进行阻塞,直到有可用线程,简单的代码如下

package com.quant.dev.modules.dev.enetity;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @program: dev
 * @description:
 * @author: Mr.EternityZhang
 * @create: 2019-07-08 17:41
 */
@Slf4j
public class TestThread {

    static class ThreadFactoryCustom implements ThreadFactory{
        private final AtomicInteger threadNum=new AtomicInteger(1);
        private final String namePrefix;

        private ThreadFactoryCustom(String namePrefix){
            this.namePrefix=namePrefix+"-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
            if(t.isDaemon()){
                t.setDaemon(true);
            }
            if(t.getPriority()!=Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

        private final String threadName;

        private final URL url;

        public AbortPolicyWithReport(String threadName, URL url) {
            this.threadName = threadName;
            this.url = url;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String msg = String.format("Provider端线程池满!" +
                            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {
                try {
                    log.info("start get queue");
                    e.getQueue().put(r);
                    log.info("end get queue");
                } catch (InterruptedException ee) {
                    log.error(ee.toString(), ee);
                    Thread.currentThread().interrupt();
                }
            }
        }

    }

    public static ThreadFactory getThreadFactoryCustom(String name){
        return new ThreadFactoryCustom(name);
    }

    public static void main(String[] args) {
        String poolName="eternity";
        ThreadFactory factory=getThreadFactoryCustom(poolName);
        log.info("核数={}",Runtime.getRuntime().availableProcessors());
        ThreadPoolExecutor executor=
                new ThreadPoolExecutor(100,400,5,
                        TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
        Long begin=System.currentTimeMillis();
        CountDownLatch count=new CountDownLatch(2000);
        AtomicInteger integer=new AtomicInteger(1);
        for(int i=0;i<2000;i++){
            executor.execute(()->{
                try {
                    log.info("当前线程为={},数值={}",Thread.currentThread().getName(),integer);
                    integer.getAndIncrement();
                    Thread.sleep(500);
                    count.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            count.await();
            log.info("阻塞数值={}",count.getCount());
            log.info("活跃数量={}",executor.getActiveCount());
            if(executor.getActiveCount()==0){
                executor.shutdown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("耗时={}------结果={}",System.currentTimeMillis()-begin,integer);
    }
}

阻塞原理:

之所以能实现阻塞,是基于BlockingQueue的put方法来实现的,当阻塞队列满时,put方法会一直等待

参考

https://www.jianshu.com/p/3cfd943996a1

原文地址:https://www.cnblogs.com/eternityz/p/12238755.html

时间: 2024-12-29 07:22:30

重写ThreadFactory方法和拒绝策略的相关文章

SimpleThreadPool给线程池增加拒绝策略和停止方法

给线程池增加拒绝策略和停止方法 package com.dwz.concurrency.chapter13; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; public class SimpleThreadPool3 { private final int size; private final int queueSize; private final static int DEFA

JDK线程池的拒绝策略

关于新疆服务请求未带入来话原因的问题 经核查,该问题是由于立单接口内部没有成功调用接续的 "更新来电原因接口"导致的,接续测更新来电原因接口编码:NGCCT_UPDATESRFLAG_PUT ,立单接口调用代码如下: final Map<String, Object> paramsMap = outputObject.getBean(); paramsMap.put("provCode", provCode); paramsMap.put("t

Android线程池(二)——ThreadPoolExecutor及其拒绝策略RejectedExecutionHandler使用演示样例

MainActivity例如以下: package cc.vv; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import android.os.Bundle; import android.app.Activity; /** * Demo描写叙述: * 线程池(Threa

Android线程池(二)——ThreadPoolExecutor及其拒绝策略RejectedExecutionHandler使用示例

MainActivity如下: package cc.vv; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import android.os.Bundle; import android.app.Activity; /** * Demo描述: * 线程池(ThreadPoo

@EnableAsync @Asnc 以及4种拒绝策略

根据不同的场景,可以选择不同的拒绝策略,如果任务非常重要,线程池队列满了,可以交由调用者线程同步处理. 如果是一些不太重要日志,可以直接丢弃掉. 如果一些可以丢弃,但是又需要知道被丢弃了,可以使用ThreadPoolExecutor.AbortPolicy(),在异常处理中记录日志 /** * laizhenwei 2018-1-1 12:46:02 */ @Configuration @EnableAsync public class ExecutorConfig implements Asy

线程池的取值与拒绝策略

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 先估算一个并发数,即为corePoolSize,*1.2为maximumPoolSizeworkQue

Java线程池的拒绝策略

一.简介 jdk1.5 版本新增了JUC并发编程包,大大的简化了传统的多线程开发.前面文章中介绍了线程池的使用,链接地址:https://www.cnblogs.com/eric-fang/p/9004020.html Java线程池,是典型的池化思想的产物,类似的还有数据库的连接池.redis的连接池等.池化思想,就是在初始的时候去申请资源,创建一批可使用的连接,这样在使用的时候,就不必再进行创建连接信息的开销了.举个生活中鲜明的例子,在去著名洋快餐某基或者某劳的时候,配餐人员是字节从一个中间

线程池的4种拒绝策略

1.ThreadPoolExecutor类实现了ExecutorService接口和Executor接口,可以设置线程池corePoolSize,最大线程池大小,AliveTime,拒绝策略等.常用构造方法: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue, RejectedExecutionHandler han

JAVA线程池的拒绝策略有哪几种?

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略: AbortPolicy:丢弃任务并抛出RejectedExecutionException异常. 这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态.如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现. DiscardPolicy:丢弃任务,但是不抛出异常. 如果线程队