[Java Concurrency in Practice]第七章 取消与关闭

取消与关闭

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用)。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。

生命周期结束的问题会使任务、服务以及程序的设计和实现等过程变得复杂,而这个在程序设计中非常重要的要素却经常被忽略。一个在行为良好的软件与勉强运行的软件之间最主要的区别就是,行为良好的软件能很完善地处理失败、关闭和取消等过程。

7.1 任务取消

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的。取消某个操作的原因很多。

  • 用户请求取消。用户点击图形界面程序中的“取消”按钮,或者通过管理接口来发出取消请求,例如JMX(Java Management Extensions)。
  • 有时间限制的操作。例如,某个应用程序需要在有限时间内搜索问题空间,并在这个时间内选择最佳的解决方案。当计时器超时时,需要取消所有正在搜索的任务。
  • 应用程序事件。例如,应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都将被取消。
  • 错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的当前状态,以便稍后重新启动。
  • 关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。

其中一种协作机制能设置某个“已请求取消”标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。下面程序就使用了这项技术,其中PrimeGenerator持续地枚举素数,直到它被取消。cancel方法将设置canceled标志,并且主循环在搜索下一个素数之前会首先检查这个标志。(为了使这个过程能可靠地工作,标志canceled必须为volatile类型。)

@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this") private final List<BigInteger> primes
            = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

程序给出了这个类的使用示例,即让素数生成器运行1秒后取消。素数生成器通常不会刚好在运行1秒钟后停止,因为在请求取消的时刻和run方法中循环执行下一次检查之间可能存在延迟。cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行。如果cancel没有被调用,那么搜索素数的线程将永远执行下去,不断消耗CPU的时钟周期,并使JVM不能正常退出。

一个可取消的任务必须拥有取消策略,在这个策略中将详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

考虑现实世界中停止支付(Stop-Payment)支票的示例。银行通常都会规定如何提交一个停止支付的请求,在处理这些请求时需要做出哪些响应性保证,以及当支付中断后需要遵守哪些流程(例如通知该事务中涉及的其他银行,以及对付款人的账户进行费用评估)。这些流程和保证放在一起就构成了支票支付的取消策略。

7.1.1 中断

PrimeGenerator中的取消机制最终会使地搜索素数的任务退出,但并不是立刻发生的,需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会取消。

如下程序说明了这一点,生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用cancel方法来设置canceled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。

class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled)
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
        }
    }

    public void cancel() {
        cancelled = true;
    }
}

一些特殊的阻塞库的方法支持中断。线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

在Java的API或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起强大的应用。

每个线程都有一个boolean的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread中包含了中断线程以及查询线程中断状态的方法,如下程序清单所示。

public class Thread {
    public void interrupt() { ... }
    public boolean isInterrupted() { ... }
    public static boolean interrupted() { ... }
    ...
}

interrupt方法能中断目标线程,而isInterrupted方法能返回目标线程的中断状态。静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法,它的目标线程是发起interrupted操作的线程,即当前线程:

public static boolean interrupted() {
       return currentThread().isInterrupted(true);
}

阻塞库方法,例如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现时提前返回。它们在响应中断时执行的操作包括 : 清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。(如果在阻塞过程中发现线程的中断状态为true,则会先清除当前线程中断状态,再抛出InterruptedException异常,则阻塞方法会因抛出异常而不再阻塞。)

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这种方法,中断操作将变得“有粘性”——如果不触发InterruptedException,那么中断状态将一直保持,直到明确地清除中断状态。

调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们受到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求作出响应。

在使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态,如下代码所示:

public class TaskRunnable implements Runnable {
    BlockingQueue<Task> queue;

    public void run() {
        try {
            processTask(queue.take());
        } catch (InterruptedException e) {
            // restore interrupted status
            Thread.currentThread().interrupt();
        }
    }

    void processTask(Task task) {
        // Handle the task
    }

    interface Task {
    }
}

BrokenPrimeProducer说明了一些自定义的取消机制无法与可阻塞的库函数实现良好交互的原因。如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用许多类库中提供的中断支持。

通常,中断是实现取消的最合理方式。

使用中断而不是boolean标志来请求取消可以很好地解决BrokenPrimeProducer中的问题,如下程序所示:

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

    public void cancel() {
        interrupt();
    }
}

在每次迭代循环中,有两个位置可以检测出中断:在阻塞的put方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的put方法,因此这里并不一定要进行显式的检测,但执行检测却会使PrimeProducer对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。如果可中断的阻塞方法的调用频率并不高,不足以获得足够的响应性,那么显式地检测中断状态能起到一定的帮助作用。

7.1.2 中断策略

正如任务中应该包含取消策略(见7.1 任务取消)一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多块的速度响应中断。

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快推出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或者重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。

任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。(但你为一户人家打扫房屋时,即使主人不在,也不应该把在这段时间内收到的邮件扔掉,而应该把邮件收起来,等主人回来以后再交给他们处理,尽管你可以阅读他们的杂志。)

这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断操作,并指导某个合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出InterruptedException或表示已收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将InterruptedException传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException之后恢复中断状态:

Thread.currentThread().interrupt();

正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略作为假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

通过延迟中断请求的处理,开发人员能制定更灵活的中断策略,从而使应用程序在响应性与健壮性之间实现合理的平衡。

7.1.3 响应中断

当调用可中断的阻塞函数时,例如Thread.sleep或BlockingQueue.put等,有两种策略可用于处理InterruptedException:

  • 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

向调用者传递InterruptedException:

BlockingQueue<Task> queue;
...
public Task getNextTask() throws InterruptedException {
    return queue.take();
}

如果你不想或无法将InterruptedException传递出去(比如你的任务实现了Runnable的run方法,它是不允许报出非检测异常的),一种标准的方法就是通过再次调用interrupt来恢复中断状态。你不能屏蔽InterruptException,例如在catch块中捕获到异常却不做任何处理,除非在你的代码中实现了线程的中断策略。虽然PrimeProducer屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

不可取消的任务的中断方式:有些任务拒绝被中断,这使得它们是不可取消的。但是,即使是不可取消的任务也应该尝试保留中断状态,以防在不可取消的任务结束之后,调用栈上更高层的代码需要对中断进行处理。清单 6 展示了一个方法,该方法等待一个阻塞队列,直到队列中出现一个可用项目,而不管它是否被中断。为了方便他人,它在结束后在一个 finally 块中恢复中断状态,以免剥夺中断请求的调用者的权利。(它不能在更早的时候恢复中断状态,因为那将导致无限循环—— BlockingQueue.take() 将在入口处立即轮询中断状态,并且,如果发现中断状态集,就会抛出 InterruptedException。)(通常,可中断的方法在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)

清单 6. 在返回前恢复中断状态的不可取消任务

public Task getNextTask(BlockingQueue<Task> queue) {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                interrupted = true;
                // 失败并重试
            }
        }
    } finally {
        if (interrupted)// 保留中断状态
            Thread.currentThread().interrupt();
    }
}

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)

例如,当一个由ThreadPoolExecutor拥有的工作者线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创能一个新线程将线程池恢复到合理地规模。

7.1.4 示例:计时运行

有很多的程序运行是没有结束条件的,它们可能永远地运行下去(比如列举某个数的所有质数)。下面是求所有的质数过程:

//求所有质数(素数)的任务
public class PrimeGenerator implements Runnable {
       //存储所有求得的质数
       private final List<BigInteger> primes = new ArrayList<BigInteger>();
       private volatile boolean cancelled;//取消标示

       public void run() {
              BigInteger p = BigInteger.ONE;//从质数1开始
              while (!cancelled) {//轮询是否已被取消
                     /*
                      * BigInteger.nextProbablePrime返回大于此 BigInteger 的
                      * 可能为素数的第一个整数
                      */
                     p = p.nextProbablePrime();
                     synchronized (this) {
                            primes.add(p);
                     }
              }
       }

       public void cancel() {
              cancelled = true;
       }

       public synchronized List<BigInteger> get() {
              return new ArrayList<BigInteger>(primes);
       }
}

class PrimeGeneratorTest {
       static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
              PrimeGenerator generator = new PrimeGenerator();
              new Thread(generator).start();
              try {
                     TimeUnit.SECONDS.sleep(1);//运行1秒后停止任务
              } finally {
                     generator.cancel();
              }
              return generator.get();
       }

       public static void main(String[] args) throws Exception {
              System.out.println(aSecondOfPrimes());
       }
}

虽然PrimeGenerator运行限制为1秒,但可能需要大于一秒的时间才能停止,但是它最终检测到中断,并发出停止指令,停止线程。这个任务的另一个问题是,如果PrimeGenerator抛出运行时异常,则异常会被忽略,因为一个线中的异常不会传递到他的父线程中,而自己又没有显示地处理异常。

下面程序是上面的aSecondOfPrimes方法的改进,功能是在一定时间内运行一个Runnable任务,并安排了取消任务的计划任务,由这个任务在给定的时间间隔后中断它,这样解决了任务线程抛出未检查异常问题而父线程捕获不到的问题,该异常会被timedRun的调用者捕获:

private static final ScheduledExecutorService cancelExec = Executors.newSingleThreadScheduledExecutor();
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
       final Thread taskThread = Thread.currentThread();
       cancelExec.schedule(new Runnable() {
              public void run() {// 取消求质数任务线程的计划任务
                     taskThread.interrupt();// 发出中断请求
              }
       }, timeout, unit);
       /*
        * 直接调用Runnable的run方法,这样不会启一个线程,这
        * 样就可以捕获到run方法抛出的异常了
        */
       r.run();//这里有可能不会退出,这就看run方法是否响应了中断
}

这是一种非常简单的方法,但却破坏了以下规则:在中断线程之前,应该了解它的中断策略。由于timedRun可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。如果任务在超时之前执行完成了,取消任务可能在任务执行线程调用timedRun方法并返回后才启动,此时我们不知道任务执行线程在执行什么代码,总之没有什么好的结果。

此外,如果任务不响应中断,那么timedRun会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限)。如果某个限时运行的服务没有在指定的时间内返回,那么将对调用者带来负面影响。

下面程序解决了aSecondOfPrimes的异常处理问题,并且也解决了前面做法中引发的问题。用来执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍然能够返回到调用它的线程。在启动任务线程之后,timedRun方法会调用新创建的任务线程的限时join方法。在join方法返回后,它会检查任务是否有异常抛出,如果有,会在调用timeRun的线程中再次抛出。用来保存异常信息的域t声明成了volatile,这会安全发布该对象,这样就可以安全的在多线程中共享了。

这个版本解决了前面例子中出现的问题,但因为它依赖于一个的join方法,因此它也受到join不足之处的影响:我们不知道控制权的返回是因为线程自然退出还是join的超时。

private static final ScheduledExecutorService cancelExec = Executors
              .newSingleThreadScheduledExecutor();
public static void timedRun(final Runnable r, long timeout, TimeUnit unit)
              throws Throwable {
       //任务执行线程
       class RethrowableTask implements Runnable {
              // 保存任务执行过程中的异常信息
              private volatile Throwable t;

              public void run() {
                     try {
                            r.run();
                     } catch (Throwable t) {
                            this.t = t;
                     }
              }

              void rethrow() throws Throwable {
                     if (t != null)
                            throw t;
              }
       }

       RethrowableTask task = new RethrowableTask();
       final Thread taskThread = new Thread(task);
       taskThread.start();// 将任务放在一个线程中运行
       cancelExec.schedule(new Runnable() {
              public void run() {// 定时取消任务线程
                     taskThread.interrupt();
              }
       }, timeout, unit);
       //设置一个超时,如果任务在指定的时间内没有完成,也会返回
       taskThread.join(unit.toMillis(timeout));
       //检测任务在运行时是否抛出异常
       task.rethrow();
}

7.1.5 通过Future来实现取消

通常使用现有库的类比自行编写更好,因此将使用Future和任务执行框架构建timedRun。

cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则mayInterruptIfRunning参数决定了是否向执行任务的线程发出interrupt操作。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)(如果mayInterruptIfRunning为true并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。)

除非你清楚线程的中断策略,否则不要中断线程,那么在什么情况下调用cancel可以将参数指定为true?执行任务的线程是由标准的Executor创建的,它实现了一种中断策略使得任务可以通过中断被取消,所以如果任务在标准的Executor中运行,并通过它们的Future来取消任务,那么可以设置mayInterruptIfRunning。当尝试取消某个任务时,不宜直接指点线程池,因为你并不知道当中断请求到达时正在运行什么任务——只能通过任务的Future来实现取消。这也是在编写任务时要将中断视为一个取消请求的另一个理由:可以通过Future来取消它们。

下面是timedRun的另一个版本,使用Future和任务执行框架来取消任务:

public static void timedRun(Runnable r,
                            long timeout, TimeUnit unit)
                            throws InterruptedException {
    Future<?> task = taskExec.submit(r);//提交任务
    try {
        task.get(timeout, unit);//指定任务超时时间
    } catch (TimeoutException e) {
        // 接下来任务将被取消
        // 在finally调用cancel
    } catch (ExecutionException e) {
        // 如果在任务中抛出了异常,那么重新抛出该异常
        throw launderThrowable(e.getCause());
    } finally {
        //取消那些不在需要结果的任务
        //如果任务已经结束,那么执行取消操作也不会带来任何影响
        task.cancel(true);  // 如果任务正在运行,那么将被中断
    }
}

当Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

7.1.6 处理不可中断的阻塞

在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断:如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。

对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。

  • Java.io包中的同步Socket I/O。在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
  • Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
  • Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
  • 获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。(尝试获取一个内部锁的操作(进入一个 synchronized 块)是不能被中断的)

下面ReaderThread展示了一项用来封装非标准取消任务的技术(非标准中断技术,标准的中断是调用线程的interrupt方法来抛出InterruptException异常?),通过重写Thread的interrupt来封装:

public class ReaderThread extends Thread {
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    //重写Thread的interrupt,即支持标准的中断,也关闭了底层的socket
    public void  interrupt()  {//
        try {
            socket.close();//要中断socket的阻塞方法则需要关闭socket
        }
        catch (IOException ignored) { }
        finally {
            super.interrupt();//不要忘了调用标准中断interrupt
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);//这里可能会阻塞,如果在阻塞时socket被关闭,则会抛出异常,从而可以跳出阻塞状态
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { /*  允许线程退出  */  }
    }
}

7.1.7 采用newTaskFor来封装非标准的取消

可以通过newTaskFor方法来进一步优化ReaderThread中封装非标准取消的技术,这是Java6在ThreadPoolExecutor中的新增功能。它是一个工厂方法,用来创建描述任务的Future,它返回的是一个RunnableFuture,这是一个接口,继承了Future和Runnable(FutureTask是它的实现)。

我们可以自定义任务的Future,这样可以覆写Future.cancel方法,并可以在该方法中实现日志、取消那些不响应中断的活动,上面RunnableFuture通过重写了Thread的interrupt实现了取消socket的阻塞线程,现样可以重写任务的Future.cancel来实现。

用newTaskFor封闭非标准取消技术:

//非标准取消的任务接口,扩展了Callable接口,并添加了cancel与newTask接口,我们的任务都要实现这个接口
public interface CancellableTask<T> extends Callable<T> {
void cancel();//自己实现取消任务
//自己实现RunnableFuture,不使用Executor提供的FutureTask默认实现
    RunnableFuture<T> newTask();
}

@ThreadSafe//重写newTaskFor方法,该方法会在向Executor提交任务时调用
public class CancellingExecutor extends ThreadPoolExecutor {
    ...
    protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)//如果是自定义的Callable
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}
//实现自己的任务,注,这里是为抽象类,因为还没有使用Callable的call方法
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
    @GuardedBy("this") private Socket socket;

    protected synchronized void setSocket(Socket s) { socket = s; }
    //封装非标准的任务取消技术
    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();//取消任务就是关闭socket
        } catch (IOException ignored) { }
    }
//为Executor提供的Future实现,可用来替代Executor默认的FutureTask
    public RunnableFuture<T> newTask() {
        return new FutureTask <T>(this) {//匿名类,继承自FutureTask
            //重写FutureTask方法,即在FutureTask的cancel基础上添加了自己非标准中断实现
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();//先调用自己的非标准中断实现
                } finally {//然后再调用父类的标准中断实现
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

SocketUsingTask实现了CancellableTask,并定义了Future.cancel来关闭套接字和调用super.cancel。如果SocketUsingTask通过其自己的Future来取消,那么底层的Socket会被关闭并且执行线程也会被中断,这就在原来默认的FutureTask基础之上添加了Socket I/O的非标准中断。通过上面的封装,我们后面的工作就是只专注于对任务call方法的实现。

因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻塞的套接字I/O方法。

7.2 停止基于线程的服务

应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。如果应用程序完成退出,这些服务线程也要结果。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。

正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等。在线程API中,并没有对线程所有权给出正式的定义:线程由Thread对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdown和shutdownNow等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

7.2.1 示例:日志服务

日志服务器是将应用程序的要记的日志输出到一个缓存中(某个队列),再通过一个后台日志线程将缓存中的日志写入到日志存储器上,这样降低了应用程序直接与日志存储器的交互可能的瓶颈。

不支持关闭的生产者-消费者日志服务:

// LogWriter就是一个基于线程的服务,但不是一个完成的服务
public class LogWriter {
    //日志缓存
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;//日志写线程
private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

public void start() { logger.start(); }

    //应用程序向日志缓存中放入要记录的日志
    public void log(String msg) throws InterruptedException {
        queue.put(msg);
}

    //日志写入线程,这是一个多生产者,单消费者的设计
    private class LoggerThread extends Thread {
        private final PrintWriter writer;
        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }
        public void run() {
            try {
                while (true)
                   writer.println(queue.take());
            } catch(InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

为了让一个这个日志服务真正可用,应该需要一个方法来终止日志线程。应用程序只能拥有LogWriter服务,但不能直接拥有该服务所拥有的日志写线程LoggerThread,所以我们应该为LogWriter服务提供一个生命周期的方法,以便能对日志写线程进行关闭动作,这样不会让JVM无法正常关闭。

虽然LoggerThread线程调用了阻塞方法take,并可响应中断,但这里我们不能简单地通过这种方式来终止LoggerThread线程。因为如果应用程序在不断地向缓存放入日志信息,那么take方法将永远不会阻塞,日志线程也就不可能退出。另外,取消一个基于生产者-消费者模式的服务中的线程,我们不只要取消消费者线程,也要考虑生产者线程,但这个例子中,因为生产者并非一个线程,取消它们是困难的。

最好的关闭LogWriter的方案是设置“已请求关闭”标志,避免消息进一步被提交进来。下面LogService是一个完整的日志服务实现,它在LogWriter基础之上添加了可靠的关闭服务器功能。

为LogWriter提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。我们采用的方法是:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提提交消息的权利,如下所示:

//日志服务,提供记录日志的服务,并有管理服务生命周期的相关方法
public class LogService {
       private final BlockingQueue<String> queue;
       private final LoggerThread loggerThread;// 日志写线程
       private final PrintWriter writer;
       private boolean isShutdown;// 服务关闭标示
       // 队列中的日志消息存储数量。我们不是可以通过queue.size()来获取吗?
       // 为什么还需要这个?请看后面
       private int reservations;

       public LogService(Writer writer) {
              this.queue = new LinkedBlockingQueue<String>();
              this.loggerThread = new LoggerThread();
              this.writer = new PrintWriter(writer);

       }

       //启动日志服务
       public void start() {
              loggerThread.start();
       }

       //关闭日志服务
       public void stop() {
              synchronized (this) {
                     /*
                      * 为了线程可见性,这里一定要加上同步,当然volatile也可,
                      * 但下面方法还需要原子性,所以这里就直接使用了synchronized,
                      * 但不是将isShutdown定义为volatile
                      */
                     isShutdown = true;
              }
              //向日志线程发出中断请求
              loggerThread.interrupt();
       }

       //供应用程序调用,用来向日志缓存存放要记录的日志信息
       public void log(String msg) throws InterruptedException {
              synchronized (this) {
                     /*
                      * 如果应用程序发出了服务关闭请求,则不存在接受日志,而是直接
                      * 抛出异常,让应用程序知道
                      */
                     if (isShutdown)
                            throw new IllegalStateException(/*日志服务已关闭*/);
                     /*
                      * 由于queue是线程安全的阻塞队列,所以不需要同步(同步也可
                      * 但并发效率会下降,所以将它放到了同步块外)。但是这里是的
                      * 操作序列是由两个操作组成的:即先判断isShutdown,再向缓存
                      * 中放入消息,如果将queue.put(msg)放在同步外,则在多线程环
                      * 境中,LoggerThread中的  queue.size() == 0 将会不准确,所
                      * 以又要想queue.put不同步,又要想queue.size()计算准确,所
                      * 以就使用了一个变量reservations专用来记录缓存中日志条数,
                      * 这样就即解决了同步queue效率低的问题,又解决了安全性问题,
                      * 这真是两全其美
                      */
                     //queue.put(msg);
                     ++reservations;//存储量加1
              }
              queue.put(msg);
       }

       private class LoggerThread extends Thread {
              public void run() {
                     try {
                            while (true) {
                                   try {
                                          synchronized (LogService.this) {
                                                 // 由于 queue 未同步,所以这里不能使用queue.size
                                                 //if (isShutdown && queue.size() == 0)

                                                 // 如果已关闭,且缓存中的日志信息都已写入,则退出日志线程
                                                 if (isShutdown && reservations == 0)
                                                        break;
                                          }
                                          String msg = queue.take();
                                          synchronized (LogService.this) {
                                                 --reservations;
                                          }
                                          writer.println(msg);
                                   } catch (InterruptedException e) { /* 重试 */
                                   }
                            }
                     } finally {
                            writer.close();
                     }
              }
       }
}

7.2.2 关闭ExecutorService

shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完(即不会调用正在运行的任务线程的interrupt中断方法),但不接受新任务。如果已经关闭,则调用没有其他作用。

List<Runnable> shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法),并暂停处理正在等待的任务,并返回等待执行的任务列表。无法保证能够停止正在处理的任务线程,但是会尽力尝试,典型的实现是通过 Thread.interrupt() 来中断正在运行的任务线程,所以如果任何任务屏蔽或无法响应中断,则可能永远无法终止该任务。

如果ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常。

这两种方式的差别在于各自的安全性和响应性:强制关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束:而正常关闭虽然速度慢,但却更安全,因为ExecutorService会一直等到队列中的所有任务都执行完后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。

简单地程序可以直接在main函数中启动和关闭全局的ExecutorService。而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务类中,并且该服务能提供其自己的生命周期方法,例如下面的LogService的一种变化形式,它将管理线程的工作委托给一个ExecutorService,而不是由其自行管理。通过封装ExecutorService,可以将所有权链从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

public class LogService {
       //注,池中只创建一个写日志的工作线程
       private final ExecutorService exec = Executors.newSingleThreadExecutor();
       private final PrintWriter writer;

       public LogService(Writer writer) {
              this.writer = new PrintWriter(writer);
       }

       //启动日志服务
       public void start() {
              //钩子程序,在JVM正常关闭时调用,确保日志文件关闭
              Runtime.getRuntime().addShutdownHook(new Thread() {
                     public void run() {
                            try {
                                   LogService.this.stop();
                            } catch (InterruptedException ignored) {
                            }
                     }
              });
       }

       //关闭日志服务
       public void stop() throws InterruptedException {
              try {
                     exec.shutdown();//先尝试关闭池中的工作线程
                     exec.awaitTermination(1, TimeUnit.MINUTES);//等待工作线程结束
              } finally {
                     writer.close();//最后关闭日志文件
              }
       }

       //供应用程序调用,用来向日志缓存存放要记录的日志信息
       public void log(String msg) throws InterruptedException {
              try {
                     exec.execute(new WriteTask(msg));
              } catch (RejectedExecutionException ignored) {
              }
       }

       //写日志任务
       private class WriteTask implements Runnable {
              private String msg;

              public WriteTask(String msg) {
                     this.msg = msg;
              }

              public void run() {
                     writer.println(msg);
              }
       }
}

7.2.3 “毒丸”对象

另一种关闭生产者 - 消费者服务的方式就是使用“毒丸”对象:“毒丸”是指一个放在队列上的队列,其含义是:“当得到这个对象时,立即停止。”在FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。生产者要注意的是在放入这个对象后就不能再放入了。这种方式对只有一个生产者比较适用。

下面通过终结标示对象来关闭一个文件检索服务:

public class IndexingService {
       private static final File POISON = new File("");// 终结标示对象
       private final IndexerThread consumer = new IndexerThread();// 生产者
       private final CrawlerThread producer = new CrawlerThread();// 消费者
       private final BlockingQueue<File> queue;
       private final FileFilter fileFilter;
       private final File root;

       class CrawlerThread extends Thread { /* 看后面 */}

       class IndexerThread extends Thread { /* 看后面 */}

       // 启动服务
       public void start() {
              producer.start();
              consumer.start();
       }

       /*
        *  停止服务。这里只能先中断生产者,当生产者响应中断后,会在队列尾放入
        *  一个终结标示对象
        */
       public void stop() {
              producer.interrupt();
       }

       /*
        *  等待服务关闭。即这里要等待消费线程完成。该方法用在调用stop方法后,用
        *  来等待服务关闭,这里模拟了ExecutorService的shutdown与awaitTermination
        *  方法
        */
       public void awaitTermination() throws InterruptedException {
              consumer.join();
       }
}
class CrawlerThread extends Thread {// 生产者
       void run() {
              try {
                     crawl(root);
              } catch (InterruptedException e) {
                     /*
                      * 如果文件很多,且生产者快于消费者时,则外界可以调用stop方法,
                      * 给生产线程发送一个中断请求,这里就会捕获到,然后就会跳出
                      *  crawl 方法
                      */
              } finally {
                     while (true) {
                            try {
                                   queue.put(POISON);
                                   break;// 如果是正常放入终结标示对象后直接跳出循环
                            } catch (InterruptedException e1) {
                                   /*
                                    * 如果队列满后在放入终结标示对象时如果线程再次被中断,
                                    * 则程序会跳到这里执行,这时我们不能退出生产线程,因
                                    * 为标示对象还未放入,所以为会将queue.put放在了无限
                                    * 循环中了,并且try还得要在循环里
                                    */
                            }
                     }
              }
       }

       private void crawl(File root) throws InterruptedException {
              ...//向队列中放的过程
       }
}
class IndexerThread extends Thread {//消费者
       public void run() {
              try {
                     while (true) {
                            File file = queue.take();
                            //如果从队列中取出的是标示对象,则退出程序
                            if (file == POISON)
                                   break;
                            else
                                   indexFile(file);
                     }
              } catch (InterruptedException consumed) {
                     /*
                      * 对于外界的InterruptedException不理采,因为队列中很可能
                      * 还有大批的任务等着该消费线程去处理,该线程的终止只依赖
                      * 于生产线程
                      */
              }
       }
}

只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象。在IndexingService中采用的解决方案可以扩展到多个生产者:只需每个生产者都向队列中放入一个“毒丸”对象,并且消费者仅当在接收到Nproducers个“毒丸”对象时才停止。这种方法也可以扩展到多个消费者的情况,只需生产者将Nconsumers个“毒丸”对象放入队列。然而,当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,“毒丸”对象才可以可靠地工作。因为如果是有限的时在放入标示对象时可能会阻塞,这又可能会引发中断异常。

7.2.4 只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。(在这种情况下,invokeAll和invokeAny等方法通常会起较大的作用。)

如下checkMail方法能在多台主机上并行地检查新邮件。它创建一个私有的Executor,并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完毕后,关闭Executor并等待结束。

boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
        throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
//这里不能使用 volatile hasNewMail,因为还需要在匿名内中修改
    final AtomicBoolean hasNewMail = new AtomicBoolean(false);
    try {
        for (final String host : hosts)//循环检索每台主机
            exec.execute(new Runnable() {//执行任务
                public void run() {
                   if (checkMail(host))
                       hasNewMail.set(true);
                }
            });
    } finally {
        exec.shutdown();//因为ExecutorService只在这个方法中服务,所以完成后即可关闭
        exec.awaitTermination(timeout, unit);//等待任务的完成,如果超时还未完成也会返回
    }
    return hasNewMail.get();
}

7.2.5 shutdownNow的局限性

当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。(shutdownNow返回的Runnable对象可能与提交给ExecutorService的Runnable对象并不相同:它们可能是被封装过的已提交任务。)

然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当Executor关闭时哪些任务正在执行。

shutdownNow只返回了还未开始运行的任务,如果我们还想知道哪些正在运行任务是因为调用shutdownNow而被取消的,TrackingExecutor展现了如何记录那些在关闭后还未执行完的任务,它可以识别哪些任务已经开始但没有正常结束。通过封装ExecutorService并使得execute(类似地还有submit,在这里没有给出)记录哪些任务是在关闭后取消的。为了应用这个技术,任务(run方法)必须在返回时恢复中断状态,这也是每个程序应该做到的。

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public List<Runnable> getCancelledTasks() {//返回被取消的任务
        if (!exec.isTerminated())//如果shutdownNow未调用或调用未完成时
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                            /*
                             * 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
                             * 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
                             * 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
                             * 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
                             * 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
                             * 将runnable.run()与下面的if放在一个同步块、而且还要将
                             *  shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
                             *  这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
                             *  只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
                             *  性能上的问题,因为什么所有的任务都会串形执行,这样还要
                             *  ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
                             *  这是“不可避免的竞争条件”
                             */
                } finally {
                                   //如果调用了shutdownNow且运行的任务被中断
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);//记录被取消的任务
                }
            }
        });
}
// 将ExecutorService 中的其他方法委托到exec
}

下面的WebCrawler展示了TrackingExecutor的应用。Web Crawler的工作通过是无尽的,所以在Crawler关闭时保存它的状态,这样可以在下次启动时继续前一状态继续运行。由于TrackingExecutor存在不可避免的竞争条件,使它产生假阳性现象:识别出的被取消任务事实上可能已经结束,产生的原因是在任务执行的最后一条件指令,以及线程池记录为完成任务之间(and when the pool records the task as complete)。如果两次执行的结果与执行一次相同,那么这不会有会么问题,这个例子就是这样。另外,应用程序得到已被取消的任务必须注意这个风险,应该为这样的假阳性现象作好准备。

//web爬虫
public abstract class WebCrawler {
       private volatile TrackingExecutor exec;
       private final Set<URL> urlsToCrawl = new HashSet<URL>();

       private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
       private static final long TIMEOUT = 500;
       private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;

       public WebCrawler(URL startUrl) {
              urlsToCrawl.add(startUrl);
       }

       public synchronized void start() {
              exec = new TrackingExecutor(Executors.newCachedThreadPool());
              for (URL url : urlsToCrawl)
                     submitCrawlTask(url);
              urlsToCrawl.clear();
       }

       public synchronized void stop() throws InterruptedException {
              try {
                     saveUncrawled(exec.shutdownNow());//保存还未开始的任务
                     if (exec.awaitTermination(TIMEOUT, UNIT))
                            saveUncrawled(exec.getCancelledTasks());//保存被取消的任务
              } finally {
                     exec = null;
              }
       }

       //具体对每个URL的处理由子类实现,并返回这个页面所有链接
       protected abstract List<URL> processPage(URL url);

       private void saveUncrawled(List<Runnable> uncrawled) {
              for (Runnable task : uncrawled)
                     //取出未做完或没做的URL并保存
                     urlsToCrawl.add(((CrawlTask) task).getPage());
       }

       //提交任务爬虫任务
       private void submitCrawlTask(URL u) {
              exec.execute(new CrawlTask(u));
       }

       //每个URL一个爬虫任务
       private class CrawlTask implements Runnable {
              private final URL url;

              CrawlTask(URL url) {
                     this.url = url;
              }

              //private int count = 1;

              boolean alreadyCrawled() {
                     return seen.putIfAbsent(url, true) != null;
              }

              void markUncrawled() {
                     seen.remove(url);
                     System.out.printf("marking %s uncrawled%n", url);
              }

              public void run() {
                     for (URL link : processPage(url)) {
                            //如果任务线程被中断,则响应中断语法,即退出任务
                            if (Thread.currentThread().isInterrupted())
                                   return;
                            submitCrawlTask(link);//继续对象每个页面中的链接进行处理
                     }
              }

              public URL getPage() {
                     return url;
              }
       }
}

7.3 处理非正常的线程终止

如果在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中。

导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。

线程非正常退出的后果可能是良性的(拥有50个线程的线程池丢失一个线程),也可能是恶性的(GUI程序中丢失了事件分派线程),这要取决于线程在应用程序中的作用。

任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常。对调用的代码越不熟悉,就越应该对其代码行为保持怀疑。

在任务处理线程(例如线程池中的工作者线程或者Swing的事件派发线程等)的生命周期中,将通过某种抽象机制(例如Runnable)来调用许多未知的代码,我们应该对在这些线程中执行的代码能否表现出正确的行为保持怀疑。因此,这些线程应该在try-catch代码块中调用这些任务,这样就能捕获那些未检查的异常了,或者也可以使用try-finally代码块来确保框架能够知道线程非正常退出的情况,并做正确的响应。在这种情况下,你或许会考虑捕获RuntimeException,即当通过Runnable这样的抽象机制来调用未知的和不可信的代码时。(对于捕获运行时异常可能会有一些争议,但当程序抛出一个运行时异常时,整个应用程序都可能受到威胁,所以从系统安全角度来看,这种捕获运行异常是可取的,对自己写的代码我们是不允许这样做的,但对于未知的代码,这样做也是无可厚非的。)

下面程序展示了如何在线程池中实现一个工作者线程。如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结。然后框架可能会用新的线程取代这个工作线程,也可能不这样做,因为线程池也许正在关闭,或者是池中线程充足。ThreadPoolExecutor和Swing使用这技术来确保那些不能正常运转的任务不会影响到后续任务的执行。

典型的线程池工作线程的实现:

public void run() {//工作者线程的实现
    Throwable thrown = null;
    try {
        while (!isInterrupted())
            runTask(getTaskFromWorkQueue());
    } catch (Throwable e) {//为了安全,捕获的所有异常
        thrown = e;//保留异常信息
    } finally {
        threadExited(this, thrown);// 重新将异常抛给框架后终结工作线程
    }
}

当编写一个向线程池提交任务的工作者线程类时,或者调用不可信的外部代码时(例如动态加载的插件),使用这些方法中的某一种可以避免某个编写糟糕的任务或插件不会影响调用它的整个线程。

未捕获异常的线程

上一节介绍了一种主动方法来解决未检查异常。在Thread API中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将二者结合在一起,就能有效地防止线程泄露问题。

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaghtExceptionHandler异常处理器,如下清单所示。如果没有提供任务异常处理器,那么默认的行为是将栈追踪信息输出到System.err。

public interface UncaghtExceptionHandler
{
    void uncaghtException(Thread t,Throwable e);
}

在Java5.0之前,控制UncaghtExceptionHandler的唯一方法就是对ThreadGroup进行子类化。在Java5.0及之后的版本中,可以通过Thread.setUncaghtExceptionHandler为每个线程设置一个UncaghtExceptionHandler,还可以 使用setDefaultUncaghtExceptionHandler来设置默认的UncaghtExceptionHandler。然而,在这些异常处理器中,只有其中一个将被调用——JVM首先会搜索每个线程的异常处理器,然后再搜索一个ThreadGroup的异常处理器。ThreadGroup中默认异常处理器实现将异常处理工作逐层委托给它的上层ThreadGroup,直至其中某个ThreadGroup的异常处理器能够处理该未捕获异常,否则将一直传递到顶层的ThreadGroup。顶层ThreadGroup的异常处理器委托给默认的系统处理器(如果存在,在默认情况下为空),否则将把栈追踪信息输出到控制台。

未捕获到的异常处理首先由线程自己控制(由Thread. setUncaughtExceptionHandler设置的未捕获异常处理器),然后由线程的 ThreadGroup 对象控制,最后由未捕获到的默认异常处理程(由Thread. setDefaultUncaughtExceptionHandler设置的未捕获异常处理器)序控制。如果线程不设置明确的未捕获到的异常处理程序,并且该线程的线程组(包括父线程组)未特别指定其 uncaughtException 方法(即重写),则将调用默认处理程序(由Thread. setDefaultUncaughtExceptionHandler设置的未捕获异常处理器)的 uncaughtException 方法。

异常处理器如何处理未捕获异常,取决于对服务质量的需求。最常见的响应方式是将一个错误信息以及响应的栈追踪信息写入应用程序日志中,如下程序所示。异常处理器还可以采取更直接的的响应,例如尝试重新启动线程,关闭应用程序,或者执行其他修复或诊断等操作。

public class UEHLogger implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }
}

在运行时间较长的应用程序中,通常会为所有的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

为了给线程池(ExecutorService)设置UncaughtExceptionHandler,需要向ThreadPoolExecutor的构造函数提供一个ThreadFactory。(与所有的线程操控一样,只有线程的所有者能够改变线程的UncaghtExceptionHandler。)标准的线程池(ExecutorService)允许任务抛出未捕获的异常去终止任务线程,但是当任务线程终止后,使用一个try-finally块来接收通知的话,就能够用新的线程取代它。如果没有未捕获异常的处理器,又没有其他失败通知机制,任务将会无声无息地失败。如果你想在任务因异常而失败时获得通知,那么你应该采取一些特定的任务恢复机制,或者是用Runnable与Callable把任务包装起来提交给ExecutorService执行,然后通过Future.get来获取任务执行时抛出的异常(如果任务是Thread子类,此时即使为这个任务设置了异常处理器,此时也不会起作用,而是将异常封装在Future对象里了),或者是重写ThreadPoolExecutor的afterExecute(Runnable r,Throwable t)钩子方法,这样就能够捕获异常。

要注意的是,Executor框架中的任务未捕获异常处理器只有捕获处理通过execute提交的任务,而通过submit提交的任务,抛出的任何异常,都会封装在任务Future里,如果有一个以submit提交的任务以异常而终止,这个异常会被Future.get重新抛出,这些异常并包装在ExecutionException(注,只有任务本身所抛出的异常再封装成这种异常,比如CancellationException异常就不会,因为这不是任务本身所抛出的)。

下面是Executor框架中的UncaughtExceptionHandler处理器实现:

// 会抛出异常的线程
class ExceptionThread implements Runnable {
       public void run() {
              Thread t = Thread.currentThread();
              System.out.println("run() by " + t);
              System.out.println("1.eh = " + t.getUncaughtExceptionHandler());
              throw new RuntimeException();//线程运行时一定会抛出运行异常
       }
}

// 线程未捕获异常处理器
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
       // 异常处理方法
       public void uncaughtException(Thread t, Throwable e) {
              System.out.println("caught : ");
              e.printStackTrace();
       }
}

/*
* 线程工厂,Executor框架在创建线程时会调用该工厂,ThreadFactory为
* Executor框架定义的线程创建工作接口,它的作用是创建线程时能为这个
* 线程做一些其它的事件,如这里给它设置一个未捕获异常处理器。
*/
class HandlerThreadFactory implements ThreadFactory {
       public Thread newThread(Runnable r) {//线程创建工厂方法
              System.out.println(this + " creating new Thread");
              Thread t = new Thread(r);
              t.setName("exception thread");
              System.out.println("created " + t);
              //设置异常处理器
              t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
              System.out.println("2.eh = " + t.getUncaughtExceptionHandler());
              return t;
       }
}

public class CaptureUncaughtException {
       public static void main(String[] args) {
              ExecutorService exec = Executors
                            .newCachedThreadPool(new HandlerThreadFactory());
              //未捕获异常处理器会捕获到工作线程的所抛出的异常
              exec.execute(new ExceptionThread());

              /*
               *  注,submit提交的任务抛出的异常是不能被未捕获异常处理捕获到的
               *  因为该异常已经通过Future传递到了executor框架线程中来了,并且
               *  任务抛出异常后,调用future.get时就会重新抛出这个异常
               */
              Future<?> f = exec.submit(new ExceptionThread());
              try {
                     //future.get会重新抛出任务执行时所未捕获到异常
                     System.out.println(f.get());
              } catch (Exception e) {
                     e.printStackTrace();
              }
       }
}

7.4 JVM关闭

JVM既可通过正常手段来关闭,也可强行关闭。

正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-C)。

强行关闭:Runtime.halt(应小心使用此方法。与 exit 方法不同,此方法不会启动关闭挂钩,并且如果已启用退出终结,此方法也不会运行未调用的终结方法。如果已经发起关闭序列,那么此方法不会等待所有正在运行的关闭挂钩或终结方法完成其工作)、杀死JVM操作系统进程(比如在 Unix 上使用 SIGKILL 信号)。注,这种强行关闭方式将无法保证是否将运行关闭钩子。

7.4.1 关闭钩子

在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runnable.addShutdownHook注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。在关闭应用程序线程时,如果有(守护或非守护)线程仍然在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器(finalize),然后再停止。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步机制,并且小心地避免发生死锁,这与其他并发代码的要求相同。关闭钩子不应该对应用程序的状态(例如,其他服务是否已经关闭,或者所有的正常线程是否已经执行完成)或者JVM的关闭原因做出任何假设。

关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM能尽快终止。由于用户注销或系统关闭而终止虚拟机时,底层的操作系统可能只允许在固定的时间内关闭并退出。因此在关闭挂钩中尝试进行任何用户交互或执行长时间的计算都是不明智的。

与其他线程一样,我们可以为它置一个UncaughtExceptionHandler处理器,在抛出未捕获到的异常时来处理异常。

关闭钩子可以用于实现服务或应用程序清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。如下所示,在start方法中注册一个关闭钩子,从而确保在退出时关闭日志文件:

public void start()
{
    Runnable.getRuntime().addShutdownHook(new Thread(){
        public void run()
        {
            try{LogService.this.stop();}
            catch(InterruptedException ignored){}
        }
    });
}

由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。为了避免这种情况,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。实现这种功能的一种方式是对所有服务使用同一个关闭钩子(而不是每个服务使用一个不同的关闭钩子),并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了在关闭操作之间出现竞态条件或死锁问题。

7.4.2 守护线程

有时候,你希望创建一个线程来执行一些辅助工作,但有不希望这个线程阻碍JVM的关闭。在这种情况下就需要使用守护线程(Daemon Thread)。

线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。

普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

我们应尽可能少地使用守护线程——很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务时,那么将是一种危险的行为。守护线程最好用于执行“内部”任务,列入周期性地从内存缓存中移除逾期的数据。

此外,守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期。

7.4.3 终结器

当不再需要内存资源时,可以通过垃圾回收器来回收它们,但对于其他一些资源,例如文件句柄或套接字句柄,当不再需要它们时,必须显式地交还给操作系统。为了实现这个功能,垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而确保一些持久化的资源被释放。

由于终结器可以在某个由JVM管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的。在大多数情况下,通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的。基于这些原因及其他一些原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)[EJ Item 6]。

避免使用终结器

小结

在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现的复杂性。Java并没有提供某种抢占式的机制来取消操作或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及能否始终遵守这些协议。通过使用FutureTask和Executor框架,可以帮助我们构建可取消的任务和协议。

参考:Java并发编程

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-20 02:17:14

[Java Concurrency in Practice]第七章 取消与关闭的相关文章

[Java Concurrency in Practice]第三章 对象的共享

对象的共享 第二章介绍了如何通过同步来避免多个线程在同一时刻访问相同的数据,而第三章则介绍如何共享和发布对象,从而使它们能够安全地由多个线程同时访问.这两章合在一起就形成了构建线程安全类以及通过java.util.concurrent类库来构建并发应用程序的重要基础. synchronized不仅仅只有原子性,还具有内存可见性.我们不仅希望防止某个线程正在使用对象状态而另一个线程在同时修改该状态,而且希望确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化.如果没有同步,那么这种情况就

《Java并发编程实战》第七章 取消与关闭 读书笔记

Java没有提供不论什么机制来安全地(抢占式方法)终止线程,尽管Thread.stop和suspend等方法提供了这种机制,可是因为存在着一些严重的缺陷,因此应该避免使用. 但它提供了中断Interruption机制,这是一种协作机制,可以使一个线程终止还有一个线程的当前工作. 一.任务取消 取消操作的原因: . 用户请求取消 . 有时间限制的操作 . 应用程序事件 . 错误 . 关闭 结束任务的四种方式: 1. run方法运行结束 2. 使用请求关闭标记(比如boolean开关) 3. 使用中

[Java Concurrency in Practice]第五章 基础构建模块

基础构建模块 委托时创建线程安全类的一个最有效的策略,只需让现有的线程安全类管理所有的状态即可. 平台类库中包含了一个并发构建块的丰富集合,如线程安全的容器与同步工具. 5.1 同步容器类 分两部分,一是JDK1.0的Vector与Hashtable,另一个是JDK1.2才被加入的同步包装类Collections.synchronizedXxx工厂方法创建的.Collections.synchronizedXxx工厂方法构造出的容器返回的List与Set的iterator()与listItera

java并发编程实战:第七章----取消与关闭

Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用 中断:一种协作机制,能够使一个线程终止另一个线程的当前工作 立即停止会使共享的数据结构处于不一致的状态,需要停止时,发出中断请求,被要求中断的线程处理完他当前的任务后会自己判断是否停下来 一.任务取消 若外部代码能在某个操作正常完成之前将其置入"完成"状态,则还操作是可取消的.(用户请求取消.有时间限制的操作<并发查找结果,一个线程找到后可取

[Java Concurrency in Practice]二至五章小结

下面这个"并发技巧清单"列举了在第一部分(二至五章)中介绍的主要概念和规则. 可变状态是至关重要的(It's the mutable state,stupid). 所有的并发问题都可以归结为如何协调对并发状态的访问.可变状态越少,就越容易确保线程安全性. 尽量将域声明为final类型,除非需要它们是可变的. 不可变对象一定是线程安全的. 不可变对象能极大地降低并发编程的复杂性.它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制. 封装有助于管理复杂性. 在编写线程安全的

java并发编程实战学习笔记之取消与关闭

第七章 取消与关闭 7.1 任务取消 方式一.通过volatile类型的域来保存取消状态 方式二.interrupt()方法 interrupt()可以中断目标线程 isinterrupted()方法用来检测目标线程的中断状态 interrupted()用于清除中断状态,并且返回之前的中断状态,这是唯一可以清除中断状态的方法,如果在调用该方法是返回了true,那么除非你想屏蔽这个中断,否则你必须对他进行处理,可以抛出interruptExeption异常或者重新通过interrupt来恢复中断状

Java Concurrency in Practice 4.1-4.2相关问题及理解

今天终于又重新拿起了Java Concurrency in Practice,之前被虐的体无完肤,在看这本书之前,有一部分自己写的代码我根本没意识到是线程不安全的,还真的是要恶补这方面的知识. 1.Java监视器模式 监视器模式其实很简单,就是用私有对象的锁或者内置锁来保证所属对象的线程安全性.这里引入一个例子:车辆追踪 public class MonitorVehicleTracker { private final Map<String, MutablePoint> locations;

JAVA: httpclient 详解——第七章;

httpclient 详解--第一章: httpclient 详解--第二章: httpclient 详解--第三章: httpclient 详解--第四章: httpclient 详解--第五章: httpclient 详解--第六章: httpclient 详解--第七章: 相对于httpurlconnection ,httpclient更加丰富,也更加强大,其中apache有两个项目都是httpclient,一个是commonts包下的,这个是通用的,更专业的是org.apache.htt

[Java Concurrency in Practice]第二章 线程安全性

线程安全性 要编写线程安全的代码,其核心在于要对状态访问操作进行管理,特别是对共享和可变的状态的访问. 对象的状态是指存储在状态变量(例如实例或静态域)中的数据.对象的状态可能包括在其他依赖对象的域.例如,某个HashMap的状态不仅存储在HashMap对象本身,还存储在许过Map.Entry对象中.在对象的状态中包含了任何可能影响其外部可见行为的数据. "共享"意味着变量可以由多个线程同时访问,而"可变"则意味着变量的值在其生命周期内可以发生变化. 一个对象是否需