java并发编程6.取消与关闭

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的。

Java没有提供任何机制来安全地终止线程。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

其中一种协作机制能设置某个“已请求取消”的标志,而任务将定期地查看该标志,如果设置了这个标志,那么任务将提前结束。

自定义取消机制

/**
 * 素数生成器
 */
private class PrimeGenerator implements Runnable{
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run(){
        BigInteger p = BigInteger.ONE;
        while(!cancelled){
            p = p.nextProbablePrime();
            synchronized(this){
                primes.add(p);
            }
        }
    }

    public void cancel(){
        cancelled = true;
    }

    public synchronized List<BigInteger> get(){
        return new ArrayList<BigInteger>(primes);
    }
}
    /**
     * cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行
     */
    List<BigInteger> secondPrimes() throws InterruptedException{
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try{
            Thread.sleep(1);
        }finally{
            generator.cancel();
        }
        return generator.get();
    }

一个可取消的任务必须拥有取消策略,在这个策略中定义 其他代码如何请求取消该任务;任务在何时检查是否已经请求了取消;以及在响应取消请求时应该执行哪些操作。

阻塞任务的取消
PrimeGenerator的取消机制最终会使得素数生成器的任务退出,但在退出的过程中需要花费一定的时间。

然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能产生更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。

/**
 * 如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。
 * 当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,它会调用cancel()方法来设置cancelled标志,
 * 但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(消费者已经已经停止从队列去除素数,put将一直阻塞)。
 */
private class BrokenPrimeProducer extends Thread{
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
        this.queue = queue;
    }

    public void run(){
        try{
            BigInteger p = BigInteger.ONE;
            while(!cancelled){
                queue.put(p = p.nextProbablePrime());
            }
        }catch(InterruptedException e){

        }
    }

    public void cancel(){
        cancelled = true;
    }
}
void consumePrimes() throws InterruptedException{
        BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>();
        BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
        producer.start();
        try{
            while(needMorePrimes()){
                consume(primes.take());
            }
        }finally{
            //如果阻塞,将取消失败
            producer.cancel();
        }
    }

每个线程中都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为ture。

在Thread中包含了中断线程以及查询线程中断状态的方法:

/**
 * interrupt能中断目标线程
 * isInterrupted返回目标线程的中断状态
 * interrupted清除当前线程的中断状态,并返回它之前的值,也是清除中断状态的唯一方法。
 */
public class Thread{
    public void interrupt(){}
    public boolean isInterrupted(){}
    public static boolean interrupted(){}
    //...
}

Thread的阻塞方法,如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现中断时提前返回。

它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。

中断并不会真正的中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己,wait,sleep,join等方法将严格处理这种请求。

在使用interrupted时要小心,因为它会清除当前线程的中断状态。如果在调用时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者再次调用interrupt来恢复中断状态(这样在之后还可以检查到中断并处理)。

使用中断实现取消

上面BrokenPrimeProducer说明了一些自定义的取消机制无法与可阻塞的库函数实现良好的交互。

如果任务能够响应中断,那么可以使用中断作为取消机制,并且利用许多类库中提供的中断支持。(中断是实现取消的最合理方式)

/**
 * 使用中断而不是boolean标志来请求取消。
 * 有两个位置可以检测出中断:在阻塞的put方法中,以及在循环开始处。
 * 由于调用了阻塞的put方法,因此可以不进行显示地检测(put中会检测并响应中断,抛出InterruptedException),
 * 但执行检测会提高对中断的响应性(如果可阻塞的方法的调用频率并不高,则不足以获得足够的响应性)
 */
public class PrimeProducer extends Thread{
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue){
        this.queue = queue;
    }

    public void run(){
        try{
            BigInteger p = BigInteger.ONE;
            while(!Thread.currentThread().isInterrupted()){
                queue.put(p = p.nextProbablePrime());
            }
        }catch(InterruptedException e){

        }
    }

    public void cancel(){
        interrupt();
    }
}

中断策略

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。

需要区分任务和线程对中断的响应,任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。

对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心的保存中断状态,这样拥有线程的代码才能对中断做出响应,即使非所有者代码也可以做出响应。(当你为一户人家打扫房屋时,即使主人不在家,也不应该把在这段时间内收到的邮件扔掉,而应该把邮件收起来,等主人回来一户再交给他们处理,尽管你可以阅读他们)。

这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。因为它们永远不会在某个由自己拥有的线程中执行,因此它们为任务或者库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈上的上层代码可以采取进一步的操作。

任务不应该对执行任务的线程的中断策略做出任何假设,所以如果除了将InterruptedException传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException之后恢复中断状态(记住这个中断请求):Thread.currentThread().interrupted()。

响应中断

有两种策略可用于处理InterruptedException

1.传递异常。从而使你的方法也成为可中断的阻塞方法。

2.恢复中断状态,从而使调用栈中的上层代码(记住这个中断状态,让后面的代码有机会去处理)能够对其进行处理。

    /**
     * 将InterruptedException传递给调用者
     */
    BlockingQueue<Task> queue;
    //...
    public Task getNextTask() throws InterruptedException{
        return queue.take();
    }

如果不想或无法传递InterruptedException(或许通过Runnable来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用interrupt来恢复中断状态。你不能屏蔽InterruptedException,例如在catch块中捕获到异常却不做任何处理,除非你在代码中实现了线程的中断策略。虽然
PrimeGenerator屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。但大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

(?)对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复而不是捕获InterruptedException时恢复状态。如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出InterruptedException。

public Task getNextTask(BlockingQueue<Taskgt> queue){
        boolean interrupted = false;
        try{
            while(true){
                try{
                    return queue.take();
                }catch(InterruptedException e){
                    interrupted = true;
                }
            }
        }finally{
            if(interrupted){
                Thread.currentThread().interrupt();
            }
        }
    }

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。如果响应性要求较高,那么不应该调用那些执行时间较长且不响应中断的方法。

在取消过程中可能涉及除了终端状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示(当访问这些信息时,要确保使用了同步)。例如,当一个由ThreadPoolExecutor拥有的工作者线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模。

计时运行

在外部线程中安排中断

/**
     * 在调用线程中安排一个取消任务,在运行指定的时间间隔后中断它。
     * 这解决了从任务中抛出未检查异常的问题,该异常可以做到被timeRun的调用者(也就是运行Runnable r的线程)捕获。
     * 但是由于timedRun可以从任意一个线程中调用,因此它无法知道之歌调用线程的中断策略,如果线程不响应中断,
     * 那么timedRun会在任务结束时才返回,此时可能已经超过了指定的时限。
     * 而且如果任务在超时之前完成,那么中断timedRun所在线程的取消任务将在timedRun返回到调用者之后启动。
     * 虽然不知道那时情况下将运行什么代码,但结果一定是不好的。
     */
    private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);
    public static void timedRun(Runnable r,long timeout,TimeUnit unit){
        final Thread taskThread = Thread.currentThread();
        cancelExec.schedule(new Runnable(){
            public void run(){
                taskThread.interrupt();
            }
        },timeout,unit);
        r.run();
    }

在专门的线程中中断任务

private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);
    /**
     * 执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。
     * 在启动任务线程后,timedRun将执行一个限时的join方法(同步等待这个线程结束或者超时,)。
     * 在join返回后,它将检查任务是否有异常抛出。如果有的话,则在调用timedRun的线程中再次抛出
     * (Throwable在两个线程中共享,需声明为volatiel类型)。
     * 但由于依赖于限时join,无法知道执行控制是因为线程正常退出还是join超时而返回,join本身不会返回某个状态来表明它是否成功。
     */
    public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
        class RethrowableTask implements Runnable{
            private volatile Throwable t;
            public void run(){
                try{
                    r.run();
                }catch(Throwable t){
                    this.t = t;
                }
            }

            void rethrow(){
                if(t != null){
                    throw launderThrowable(t);
                }
            }
        }

        RethrowableTask task = new RethrowableTask();
        final Thread taskThread = new Thread(task);
        taskThread.start();
        cancelExec.schedule(new Runnable(){
            public void run(){
                taskThread.interrupt();
            }
        }, timeout, unit);
        taskThread.join(unit.toMillis(timeout));
        task.rethrow();
    }

通过Future来实现取消

ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,带有一个boolean类型参数。

如果设为true并且当前任务正在某个线程中运行,那么会请求中断这个线程。

如果设置为false,意味着,如果任务还没启动,就不要启动它。

/**
     * 在Future.get()抛出异常时,如果知道不再需要结果,就可以调用Future。cancel来取消任务
     */
    public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
        Future<?> task = taskExec.submit(r);
        try{

        }catch(TimeoutException e){
            //取消任务
        }catch(ExecutionException e){
            //如果任务中抛出了异常,那么重新抛出异常
            throw launderThrowable(e.getCause());
        }finally{
            //如果任务已经结束,那么执行取消操作也不会带来任何影响
            //如果任务正在运行,那么将被中断
            task.cancel(true);
        }
    }

处理不可中断的阻塞
并非所有的可阻塞方法或阻塞机制都能响应中断,如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外,没有其他的任何作用。

对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求必须指定线程阻塞的原因。

Java.io包中的同步Socket I/O

在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。

Java.io包中的同步I/O 当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路上阻塞的线程都抛出AsynchronousCloseException。

selector的异步I/O 如果一个线程在调用Selector.select方法使阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。

获取某个锁 如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,不会理会中断请求。但在Lock类中提供了lockInterruptibly方法。该方法允许在等待一个锁的同时仍能响应中断。

/**
 * ReaderThread管理了一个套接字连接,采用同步方式从该套接字中读取数据,并将接收到的数据传递给processBuffer。
 * 为了结束某个用户的连接或者关闭服务器,ReaderThread改写了interrupt方法,
 * 使其既能处理标准的中断,也能关闭底层的套接字。
 */
public class ReaderThread extends Thread{

    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException{
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    public void interrupt(){
        try{
            socket.close();
        }catch(IOException e){

        }finally{
            super.interrupt();
        }
    }

    public void run(){
        try{
            byte[] buf = new byte[1000];
            while(true){
                int count = in.read(buf);
                if(count < 0){
                    break;
                }else if(count > 0){
                    processBuffer(buf,count);
                }
            }
        }catch(IOException e){

        }
    }
}

采用newTaskFor来封装非标准的取消

java 6在ThreadPoolExecutor中新增功能。当把一个Callable提交给ExecutorService时,submit方法会返回一个Future,我们可以通过这个Future来取消任务。

newTaskFor是一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口扩展了Future和Runnable(并由FutureTask实现)。

通过newTaskFor将非标准的取消操作封装在一个任务中:

/**
 * CancellableTask接口扩展了Callable,
 * 并增加了一个cancel方法和一个newTask工厂方法来构造RunnableFuture
 */
public interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}
/**
 * CancellingExecutor扩展了ThreadPoolExecutor接口,
 * 并通过改写newTaskFor使得CancellableTask可以自定义自己的Future
 */
public class CancellingExecutor extends ThreadPoolExecutor{

    public CancellingExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    //...

    protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
        if(callable instanceof CancellableTask){
            return ((CancellableTask<T>)callable).newTask();
        }else{
            return super.newTaskFor(callable);
        }
    }
}
/**
 * SocketUsingTask实现了CancellableTask,并定义了Future.cancel。
 * 如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断。
 * 因此它提高了任务对取消操作的响应性。
 */
public abstract class SocketUsingTask<T> implements CancellableTask<T>{

    private Socket socket;

    protected synchronized void setSocket(Socket s){
        socket = s;
    }

    public synchronized void cancel(){
        try{
            if(socket != null){
                socket.close();
            }
        }catch(IOException e){

        }
    }

    public RunnableFuture<T> newTask(){
        return new FutureTask<T>(this){
            public boolean cancel(boolean mayInterruptIfRunning){
                try{
                    SocketUsingTask.this.cancel();
                }finally{
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

停止基于线程的服务
应用程序通常会拥有多个线程服务,如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdown和shutdownNow等方法。

示例:日志服务

/**
 * LogWriter给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。
 * 产生日志消息的线程并不会将消息直接写入输出流,
 * 而是由LogWriter通过BlockingQueue将消息提交给日志线程,并由日志线程写入。
 */
public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;

    public LogWriter(PrintWriter writer){
        this.queue = new LinkedBlockingQueue<String>();
        this.logger = new LoggerThread(writer);
    }

    public void start(){
        logger.start();
    }

    public void log(String msg) throws InterruptedException{
        queue.put(msg);
    }

    private class LoggerThread extends Thread{
        private final PrintWriter writer;

        public LoggerThread(PrintWriter writer){
            this.writer = writer;
        }
        //...
        public void run(){
            try{
                while(true){
                    writer.println(queue.take());
                }

            }catch(InterruptedException e){

            }finally{    

                writer.close();

            }
        }
    }
}

LogWriter需要实现一种终止日志线程的方法,从而避免JVM无法正常关闭。

要停止日志线程很容易,因为它会反复调用take,而take能响应中断。如果将日志线程修改为当捕获到InterruptedException时退出,只需中断日志线程就能停止服务。

然而,这种直接关闭的做法会丢失那些正在等待写入到日志的信息,而且,其他线程将在调用log时被阻塞,因为日志消息队列是满的,因此这些线程将无法解除阻塞状态。

当取消一个生产者--消费者操作时,需要同时取消生产者和消费者。示例中由于生产者并不是专门的线程,因此要取消将非常困难。

另一种关闭LogWriter的方法是:

设置某个已请求关闭标志,以避免进一步提交日志,并在收到关闭请求后,消费者将队列中的所有消息写入日志,并解除所有在调用log时阻塞的生产者。

/**
 * 为LogWriter提供可靠关闭操作的方法需要解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。
 * 然而,并不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。
 * 可以通过原子方式来检查关闭请求,并且由条件地递增一个计数器来保持提交消息的权利。
 */
public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private PrintWriter writer;
    private boolean isShutdown;
    private int reservations;

    public LogWriter(PrintWriter writer){
        this.queue = new LinkedBlockingQueue<String>();
        this.logger = new LoggerThread();
    }

    public void start(){
        logger.start();
    }

    public void stop(){
        synchronized(this){
            isShutdown = true;
            logger.interrupt();
        }
    }

    public void log(String msg) throws InterruptedException{
        synchronized(this){
            if(isShutdown){
                throw new IllegalStateException("...");
            }
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread{
        public void run(){
            try{
                while(true){
                    try{
                        synchronized(LogWriter.this){
                            if(isShutdown && reservations == 0){
                                break;
                            }
                        }
                        String msg = queue.take();
                        synchronized(LogWriter.this){
                            --reservations;
                        }
                        writer.println(msg);
                    }catch(InterruptedException e){

                    }
                }
            }finally{
                writer.close();
            }
        }
    }
}

将线程的管理委托给一个ExecutorService:

/**
 * 在复杂的程序中,通常将ExecutorService封装在某个服务中,并且该服务能提供自己的生命周期方法。
 * 通过封装ExecutorService,可以将所有权链从应用程序扩展到服务以及线程,
 * 所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。
 */
public class LogService {
    private final ExecutorService exec = Executors.newSingleThreadExecutor();

    public void start(){

    }

    public void stop() throws InterruptedException {
        try{
            exec.shutdown();
            exec.awaitTermination(timeout, unit);
        }finally{
            writer.close();
        }
    }

    public void log(String msg){
        try{
            exec.execute(new WriteTask(msg));
        }catch(RejectedExecutionException e){

        }
    }
}

毒丸对象
另一种关闭生产者--消费者服务的方式是使用毒丸对象:一个放在队列上的对象,当得到这个对象时,立即停止。

只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,

那么可以通过一个私有的Executor来简化服务的生命周期管理,这个Executor的生命周期由这个方法来控制。

    /**
     * 使用私有Executor,并且该Executor的生命周期受限于方法调用
     * checkMail能在多台主机上并行地检查新邮件,它创建一个私有的Executor,并向每台主机提交一个任务。
     * 然后,当所有邮件检查任务都执行完成后,关闭Executor,等待结束。
     * 采用AtomicBoolean代替volatile boolean是因为内部的Runnable可以访问hasNewMail。
     */
    boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit) throws InterruptedException{
        ExecutorService exec = Executors.newCachedThreadPool();
        final AtomicBoolean hasNewMail = new AtomicBoolean(false);
        try{
            for(final String host : hosts){
                exec.execute(new Runnable(){
                    public void run(){
                        if(checkMail(host)){
                            hasNewMail.set(true);
                        }
                    }
                });
            }
        }finally{
            exec.shutdown();
            exec.awaitTermination(timeout, unit);
        }
        return hasNewMail.get();
    }

ExecutorService.shutdownNow()的局限性

当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,但无法知道哪些任务已经开始但尚未结束。

/**
 * 通过封装ExecutorService并使得execute或submit记录哪些任务是在关闭后取消的。
 * TrackingExecutor可以找出哪些任务已经开始但还没正常完成。
 * 在Executor结束后,getCancelledTasks返回被取消的任务清单。
 * 如果要被记录,这些任务在返回时必须维持线程的中断状态。
 *
 */
public class TrackingExecutor extends AbstractExecutorService{
    private final ExecutorService exec;
    private final Set<Runnable> taskCancellAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec){
        this.exec = exec;
    }

    public List<Runnable> getCancelledTasks(){
        if(!exec.isTerminated()){
            throw new IllegalStateException("");
        }
        return new ArrayList<Runnable>(taskCancellAtShutdown);
    }

    public void execute(final Runnable runnable){
        exec.execute(new Runnable(){
            public void run(){
                try{
                    runnable.run();
                }finally{
                    /**
                     * 存在竞态条件,导致误报:一些被认为取消的任务可能实际上已经完成。
                     * 在任务执行最后一条指令以及线程池将任务记录为结束的两个时刻之间线程池可能被关闭(已经结束,但线程池未记录)。
                     * 如果任务两次执行的结果相同,例如网络爬虫,则可以忽略,否则需要考虑这个风险。
                     */
                    if(isShutdown() && Thread.currentThread().isInterrupted()){
                        taskCancellAtShutdown.add(runnable);
                    }
                }
            }
        });
    }

    //将ExecutorService的其他方法委托给exec
}

对于TrackingExecutor的使用,如网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。

/**
 * CrawlTask提供了一个getPage方法,该方法能找出正在处理的页面。
 * 当爬虫程序关闭时,无论是还没有开始的任务,还是那些被取消的任务,都将记录它们的URL。
 *
 * 对于ExecutorService.awaitTermination用法:
 * awaitTermination调用后会阻塞,等待所有子线程任务执行完毕并且shutdown被调用。
 * 参考文章:http://blog.csdn.net/pengbaoxing/article/details/42006679
 */
public abstract class WebCrawler {
    private volatile TrackingExecutor  exec;

    private final Set<URL> urlsToCrawl = new HashSet<URL>();

    public synchronized void start(){
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for(URL url : urlsToCrawl){
            submitCrawlTask(url);
        }
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException{
        try{
            saveUncrawled(exec.shutdownNow());
            if(exec.awaitTermination(timeout,unit)){
                saveUncrawled(exec.getCancelledTasks());
            }
        }finally{
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled){
        for(Runnable task : uncrawled){
            urlsToCrawl.add(((CrawlTask)task).getPage());
        }
    }

    private void submitCrawlTask(URL url){
        exec.execute(new CrawlTask(url));
    }

    private class CrawlTask implements Runnable{
        private final URL url;

        public CrawlTask(URL url) {
            this.url = url;
        }

        public void run(){
            for(URL link : processPage(url)){
                if(Thread.currentThread().isInterrupted()){
                    return;
                }
                submitCrawlTask(link);
            }
        }

        public URL getPage(){
            return url;
        }
    }
}

处理非正常的线程终止  导致线程提前死亡的最主要原因就是RuntimeException

由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们不会被捕获,也不会在调用栈中逐层传递,而是默认在控制台中输出栈追踪信息,并终止线程。从而造成线程泄露

如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者当前已有足够多的线程能满足需要。ThreadPoolExecutor和Swing都通过这项技术来确保行为糟糕的任务不会影响到后续任务的执行。

如下为典型的线程池工作者线程结构:

    public void run(){
        Throwable thrown = null;
        try{
            while(!isInterrupted()){
                runTask(getTaskFromWorkQueue());
            }
        }catch(Throwable e){
            thrown = e;
        }finally{
            threadExited(this,thrown);
        }
    }

在Thread API中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于捕获的异常而终结的情况。

当一个线程由于捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器。如果没有任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。

    /**
     * UncaughtExceptionHandler接口
     */
    public interface UncaughtExceptionHandler{
        void UncaughtExcept(Thread t, Throwable e);
    }
    /**
     * 最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入到应用程序日志中
     */
    public class UEHLogger implements Thread.UncaughtExceptionHandler{
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            Logger logger = Logger.getAnonymousLogger();
            logger.log(Level.SEVERE,"Thread terminated with exception :" + t.getName(),e);
        }
    }

要为线程池中的所有线程设置一个UncaughtExceptionHandler,需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory(只有线程的所有者才能够改变线程的UncaughtExceptionHandler)。标准线程池允许当发生未捕获异常时结束线程,但由于使用try--finally代码块来接收通知,因此当线程结束时,将有新的线程来代替它。

如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。

如果希望在任务由于发生异常而失败时获得通知,并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的Runnable和Callable中,或者改写ThreadPoolExecutor的afterExecute方法。

只有通过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过submit提交的任务,无论是抛出的未检查异常还是已检查异常,都将被任务是任务返回状态的一部分。如果一个有submit提交的任务由于抛出了异常而结束,那么这个异常将被Future.get封装在ExecutionException中重新抛出。

守护线程

线程分为两种:普通线程和守护线程

在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程,例如垃圾回收器以及其他执行辅助工作的线程。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。

普通与守护的差异仅在当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃--即不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

应尽可能少地使用守护线程---很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务,那么将是一种危险的行为。

#笔记内容来自  《java并发编程实战》

时间: 2024-11-01 17:39:09

java并发编程6.取消与关闭的相关文章

并发编程:取消与关闭

1.取消标志:任务可能永远不会检查取消标志,如BlockingQueue.put阻塞操作 2.中断: 1)它并不是真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己 2)中断是实现取消的最合理方式 3)除非你知道中断该线程的含义,否则就不应该中断这个线程 4)中断策略:尽快退出执行流程 4.响应中断 1)传递异常 2)恢复中断状态(再次调用interrupt) 注:只有实现了线程中断策略的代码才可以屏蔽中断请求 5.在专门的线程中中断任务:Rethrowabl

Java 并发编程之任务取消(九)

Jvm关闭 jvm可正常关闭也可强行关闭,正常关闭有多种触发方式: 当最后一个正常(非守护,下面会讲到什么是守护线程)线程结束时 当调用system.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-c) 通过其他特定平台的方法关闭jvm,调用Runtime.halt或者在操作系统当中杀死JVM进程(例如发送sigkill)来强行关闭jvm. 关闭钩子 在正常关闭中,jvm首先调用所有已注册的关闭钩子,关闭钩子是指通过 Runtime.addShutdow

Java 并发编程之任务取消(八)

处理非正常的线程中止 当单线程的控制台程序由于 发生了一个未捕获的异常而终止时,程序将停止运行,并产生与程序正常输出非常不同的栈追踪信息,这种情况是很容易理解的.然而,如果并发程序中的某个线程发生故障,那么通常不会如此明显.在控制台中可能会输出栈追踪信息,但没有人会观察控制台.此外,当线程发生故障时,应用程序可能看起来仍然 在工作,所以这个失败很可能被忽略.下面要讲的问题就是监测并防止在程序中"遗漏"线程的方法 . 导致线程提前死亡的最主要原因就是RuntimeException. 我

《Java并发编程实战》要点笔记及java.util.concurrent 的结构介绍

买了<java并发编程实战>这本书,看了好几遍都不是很懂,这个还是要在实战中找取其中的要点的,后面看到一篇文章笔记做的很不错分享给大家!! 原文地址:http://blog.csdn.net/cdl2008sky/article/details/26377433 Subsections  1.线程安全(Thread safety) 2.锁(lock) 3.共享对象 4.对象组合 5.基础构建模块 6.任务执行 7.取消和关闭 8.线程池的使用 9.性能与可伸缩性 10.并发程序的测试 11.显

并发编程—— 任务取消 之 停止基于线程的服务

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 并发编程—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程—— Callable和Future 并发编程—— CompletionService : Executor 和 BlockingQueue 并发编程—— 任务取消 并发编程—— 任务取消 之 中断 并发编程—— 任务取消 之 停止基于线程的服务 概述 第1 部分 问题描述 第2 部分

java并发编程10.构建自定义的同步工具

创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造.如果类库中没有提供你需要的功能,可以使用java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列.显示地Condition对象以及AbstractQueuedSynchronizer框架. 在单线程程序中调用方法时,如果基于某个状态的前提条件未得到满足,那么这个条件永远无法成真.而在并发程序中,基于状态的条件可能会由于其他线程的操作而改变. 可阻塞的状态依赖操作 acquire lock on object

《Java并发编程实战》读书笔记

Subsections 线程安全(Thread safety) 锁(lock) 共享对象 对象组合 基础构建模块 任务执行 取消和关闭 线程池的使用 性能与可伸缩性 并发程序的测试 显示锁 原子变量和非阻塞同步机制 一.线程安全(Thread safety) 无论何时,只要多于一个线程访问给定的状态变量.而且其中某个线程会写入该变量,此时必须使用同步来协助线程对该变量的访问. 线程安全是指多个线程在访问一个类时,如果不需要额外的同步,这个类的行为仍然是正确的. 线程安全的实例: (1).一个无状

Java并发编程有多难?这几个核心技术你掌握了吗?

本文主要内容索引 1.Java线程 2.线程模型 3.Java线程池 4.Future(各种Future) 5.Fork/Join框架 6.volatile 7.CAS(原子操作) 8.AQS(并发同步框架) 9.synchronized(同步锁) 10.并发队列(阻塞队列) 本文仅分析java并发编程中的若干核心问题,对于上面没有提到但是又和java并发编程有密切关系的技术将会不断添加进来完善文章,本文将长期更新,不断迭代.本文试图从一个更高的视觉来总结Java语言中的并发编程内容,希望阅读完

Java并发编程:Callable、Future和FutureTask(转)

Java并发编程:Callable.Future和FutureTask 在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callabl