java使用默认线程池踩过的坑(三)

                              云智慧(北京)科技有限公司  陈鑫

重新启动线程池

TaskManager

public class TaskManager implements Runnable {

…..

public TaskManager (Setrunners) {

super();

this.runners = runners;

executeTasks(runners);

}

private voidexecuteTasks(Set<FileTask> runners) {
    for (FileTask task : runners){
       pool.execute(task);
       System.out.println(task.getClass().getSimpleName() + " has beenstarted");
    }
}

@Override
public void run() {
    while(!Thread.currentThread().isInterrupted()) {
        try {
           long current = System.currentTimeMillis();
           for (FileTask wrapper : runners) {
               if (wrapper.getLastExecTime() != 0 && current -wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {   // 開始忘了乘以1000
                   wrapper.interrupt();
                   if (wrapper.getFiles() != null){
                       for (File file : wrapper.getFiles()){
                           file.delete();
                       }
                   }
                   System.out.println("Going to shutdown thethread pool");
                   List<Runnable> shutdownNow = pool.shutdownNow();    // 不等当前pool里的任务运行完。直接关闭线程池
                   for (Runnable run : shutdownNow) {
                       System.out.println(run + " goingto be shutdown");
                   }
                  while (pool.awaitTermination(1, TimeUnit.SECONDS)) {
                       System.out.println("The threadpool has been shutdown " + new Date());
                       executeTasks(runners);//又一次运行
                       Thread.sleep(200);
                   }
               }
            }
        } catch(Exception e1) {
           e1.printStackTrace();
        }
        try {
           Thread.sleep(500);
        } catch(InterruptedException e) {
        }
    }
}
public static void main(String[] args) {
    Set<FileTask> tasks =new HashSet<FileTask>();

    FileTask task = newFileTask();
    task.setInterval(1);
    task.setName("task-1");
    tasks.add(task);

    FileTask task1 = newFileTask();
    task1.setInterval(2);
   task.setName("task-2");
    tasks.add(task1);

    TaskManager  codeManager = new TaskManager (tasks);
    newThread(codeManager).start();
}

}

成功。把整个的ThreadPoolExector里所有的worker所有停止,之后再向其队列里又一次增加要运行的两个task(注意这里并没有清空,仅仅是停止而已)。这样做尽管能够及时处理task,可是一个非常致命的缺点在于。假设不能明白的知道ThreadPoolExecutor要运行的task。就没有办法又一次运行这些任务。

定制线程池

好吧!停止钻研别人的东西!

我们全然能够自己写一个自己的ThreadPoolExecutor。仅仅要把worker暴露出来就能够了。这里是不是回忆起前面的start问题来了,没错。我们即便能够直接针对Thread进行interrupt, 可是不能再次start它了。那么clone一个相同的Thread行不行呢?

Thread

@Override

protectedObject clone() throws CloneNotSupportedException{

throw newCloneNotSupportedException();

}

答案显而易见。线程是不支持clone 的。

我们须要又一次new 一个Thread来又一次运行。事实上我们仅仅须要将原来的Worker里的Runnable换成我们自己的task,然后将訪问权限适当放开就能够了。

还有,就是让我们的CustomThreadPoolExecutor继承Thread,由于它须要定时监控自己的所有的worker里Thread的运行状态。

CustomThreadPoolExecutor

public class CustomThreadPoolExecutor extendsThreadPoolExecutor implements Runnable {

public voidexecute(Testask command) {

….//将运行接口改为接收我们的业务类

}

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

Testask firstTask; //将Runnable改为我们的业务类,方便查看状态

Worker(Testask firstTask) {

…//相同将初始化參数改为我们的业务类

}

}

public staticvoid main(String[] args) {

CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

newSynchronousQueue());

    Testasktask = new Testask();
   task.setInterval(1);
   pool.execute(task);

    Testasktask1 = new Testask();
   task1.setInterval(2);
   pool.execute(task1);

    newThread(pool).start();
}

@Override
public voidrun() {
    while(!Thread.currentThread().isInterrupted()) {
        try {
           long current = System.currentTimeMillis();
           Set<Testask> toReExecute = new HashSet<Testask>();
           System.out.println("\t number is " + number);
            for(Worker wrapper : workers) {
                Testask tt = wrapper.firstTask;
               if (tt != null) {
                   if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) {

wrapper.interruptIfStarted();

remove(tt);

if (tt.getFiles() != null) {

for (File file: tt.getFiles()) {

file.delete();

}

}

System.out.println(“THread is timeout : ” + tt + ” “+ new Date());

toReExecute.add(tt);

}

}

}

if(toReExecute.size() > 0) {

mainLock.lock();

try {

for (Testask tt :toReExecute) {

execute(tt); // execute this task again

}

} finally {

mainLock.unlock();

}

}

} catch(Exception e1) {

System.out.println(“Error happens when we trying to interrupt andrestart a code task “);

}

try {

Thread.sleep(500);

} catch(InterruptedException e) {

}

}

}

}

Testask

class Testask implements Runnable {

…..

@Override
public voidrun() {
    while(!Thread.currentThread().isInterrupted()) {
       lastExecTime = System.currentTimeMillis();
       System.out.println(Thread.currentThread().getName() + " is running-> " + new Date());
        try {
           CustomThreadPoolExecutor.number++;
           Thread.sleep(getInterval() * 6 * 1000);
           System.out.println(Thread.currentThread().getName() + " aftersleep");
        } catch(InterruptedException e) {

Thread.currentThread().interrupt();

System.out.println(“InterruptedException happens”);

}

}

System.out.println(“Going to die”);

}

}

终于方案

综上,最稳妥的就是使用JDK自带的ThreadPoolExecutor,假设须要对池里的task进行随意时间的控制,能够考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案。可是一定要注意对于共享对象的处理,适当的处理好并发訪问共享对象的方法。

鉴于我们的场景。由于时间紧,并且须要了解的task并不多。临时选用所有又一次更新的策略。

上线后。抽时间把自己定制的ThreadPoolExecutor搞定,然后更新上去!

时间: 2024-10-11 07:22:00

java使用默认线程池踩过的坑(三)的相关文章

java使用默认线程池踩过的坑(一)

云智慧(北京)科技有限公司 陈鑫 场景 一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会持续特别长的时间,甚至有一直阻塞的可能.我们需要一个manager来管理这些task,当这个task的上一次执行时间距离现在超过5个调度周期的时候,就直接停掉这个线程,然后再重启它,保证两个目标目录下没有待处理的txt文件堆积. 问题 直接使用java默认的线程池调度task1和task2.由于外部txt的种种不可控原因,导致task2线程阻塞.现象就是task1

java使用默认线程池踩过的坑(二)

云智慧(北京)科技有限公司 陈鑫 是的.一个线程不可以启动两次.那么它是怎么推断的呢? public synchronized void start() { /** * A zero status valuecorresponds to state "NEW". 0相应的是state NEW */ if (threadStatus!= 0) //假设不是NEW state,就直接抛出异常! throw newIllegalThreadStateException(); group.ad

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

Java四种线程池

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 时间:2015-10-20 22:37:40      阅读:8762      评论:0      收藏:0      [点我收藏+] 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异

Java中的线程池

综述 在我们的开发中经常会使用到多线程.例如在Android中,由于主线程的诸多限制,像网络请求等一些耗时的操作我们必须在子线程中运行.我们往往会通过new Thread来开启一个子线程,待子线程操作完成以后通过Handler切换到主线程中运行.这么以来我们无法管理我们所创建的子线程,并且无限制的创建子线程,它们相互之间竞争,很有可能由于占用过多资源而导致死机或者OOM.所以在Java中为我们提供了线程池来管理我们所创建的线程. 线程池的使用 采用线程池的好处 在这里我们首先来说一下采用线程池的

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

Java多线程和线程池(转)

1.为什么要使用线程池 在java中,如果每个请求到达就创建一个新线程,开销是相当大的.在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源.如果在一个jvm里创建太多的线程,可能会使系统由于过度消耗内存或“切换过度”而导致系统资源不足.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源

Java四种线程池的学习与总结

在Java开发中,有时遇到多线程的开发时,直接使用Thread操作,对程序的性能和维护上都是一个问题,使用Java提供的线程池来操作可以很好的解决问题. 一.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable(){ @Override public void run(){ //TODO Auto-generatedmethod stub } } ).start(); 那你就out太多了,new Thread的弊端如下:

Java并发之——线程池

一. 线程池介绍 1.1 简介 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理.当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源. 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力. 假设一个服务器完成一项