java并发编程实战:第七章----取消与关闭

Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用

中断:一种协作机制,能够使一个线程终止另一个线程的当前工作

立即停止会使共享的数据结构处于不一致的状态,需要停止时,发出中断请求,被要求中断的线程处理完他当前的任务后会自己判断是否停下来

一、任务取消

若外部代码能在某个操作正常完成之前将其置入“完成”状态,则还操作是可取消的。(用户请求取消、有时间限制的操作<并发查找结果,一个线程找到后可取消其他线程>、应用程序事件、错误、关闭)

取消策略:详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作

举例:设置volatile变量为取消标志,每次执行前检查

 1 private volatile boolean canceled;
 2
 3     @Override
 4     public void run() {
 5         BigInteger p = BigInteger.ONE;
 6         while (!canceled){
 7             p = p.nextProbablePrime();
 8             synchronized (this) { //同步添加素数
 9                 primes.add(p);
10             }
11         }
12     }

注意:这是一个有问题的取消方式,若线程阻塞在add操作后,那么即使设置了取消状态,它也不会运行到检验阻塞状态的代码,因此会永远阻塞

1、中断

  线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。(在取消之外的其他操作使用中断都是不合适的)

  调用interrupt并不意味者立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。会在下一个取消点中断自己,如wait, sleep,join等

1 public class Thread {
2      public void interrupt() { ... }//中断目标线程,恢复中断状态
3      public boolean isInterrupted() { ... }//返回目标线程的中断状态
4      public static boolean interrupted() { ... }//清除当前线程的中断状态,并返回它之前的值(用于已经设置了中断状态,但还尚未相应中断)
5      ...
6  }

阻塞库方法,例如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现时提前返回。它们在响应中断时执行的操作包括 : 清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束

  • 显示的检测中断!Thread.currentThread().isInterrupted()后推出
  • 阻塞方法中抓到InterruptedException后退出

2、中断策略——规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作

  由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

3、响应中断

  • 传递异常(throws InterruptedException)
  • 恢复中断状态,从而事调用栈的上层代码能够对其进行处理。(Thread.currentThread().interrupt();)

4、通过Future实现取消

  boolean cancel(boolean mayInterruptIfRunning);

  • 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败,返回false
  • 调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行
  • 如果任务已经执行,mayInterruptIfRunning参数决定了是否向执行任务的线程发出interrupt操作

5、处理不可中断的阻塞——对于某些阻塞操作,只是设置了中断状态

  • Java.io包中的同步Socket I/O。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
  • Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
  • Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
  • 获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
 1 //改写interrupt方法发出中断请求
 2  @Override
 3     public void interrupt() {
 4         try {
 5             socket.close(); //中断前关闭socket
 6         } catch (IOException e) {
 7
 8         } finally{
 9             super.interrupt();
10         }
11     }

6、采用newTaskFor来封装非标准的取消

二、停止基于线程的服务

应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。

  • 服务退出 -> 线程需要结束  无法通过抢占式的方法来停止线程,因此它们需要自行结束
  • 除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等
  • 线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池
  • 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序不能拥有工作者线程,因此应用程序不能直接停止工作者线程。

服务应该生命周期方法关闭它自己以及他拥有的线程

  • 要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法
  • ExecutorService提供的shutdown(), shutdownNow()

1、示例:日志服务

 1 // LogWriter就是一个基于线程的服务,但不是一个完成的服务
 2 public class LogWriter {
 3     //日志缓存
 4     private final BlockingQueue<String> queue;
 5     private final LoggerThread logger;//日志写线程
 6 private static final int CAPACITY = 1000;
 7
 8     public LogWriter(Writer writer) {
 9         this.queue = new LinkedBlockingQueue<String>(CAPACITY);
10         this.logger = new LoggerThread(writer);
11     }
12
13 public void start() { logger.start(); }
14
15     //应用程序向日志缓存中放入要记录的日志
16     public void log(String msg) throws InterruptedException {
17         queue.put(msg);
18 }
19
20     //日志写入线程,这是一个多生产者,单消费者的设计
21     private class LoggerThread extends Thread {
22         private final PrintWriter writer;
23         public LoggerThread(Writer writer) {
24             this.writer = new PrintWriter(writer, true); // autoflush
25         }
26         public void run() {
27             try {
28                 while (true)
29                    writer.println(queue.take());
30             } catch(InterruptedException ignored) {
31             } finally {
32                 writer.close();
33             }
34         }
35     }
36 }

注意:可以中断阻塞的take()方法停止日志线程(消费者线程),但生产者没有专门的线程,没办法取消

 1 //日志服务,提供记录日志的服务,并有管理服务生命周期的相关方法
 2 public class LogService {
 3        private final BlockingQueue<String> queue;
 4        private final LoggerThread loggerThread;// 日志写线程
 5        private final PrintWriter writer;
 6        private boolean isShutdown;// 服务关闭标示
 7        // 队列中的日志消息存储数量。我们不是可以通过queue.size()来获取吗?
 8        // 为什么还需要这个?请看后面
 9        private int reservations;
10
11        public LogService(Writer writer) {
12               this.queue = new LinkedBlockingQueue<String>();
13               this.loggerThread = new LoggerThread();
14               this.writer = new PrintWriter(writer);
15
16        }
17
18        //启动日志服务
19        public void start() {
20               loggerThread.start();
21        }
22
23        //关闭日志服务
24        public void stop() {
25               synchronized (this) {
26                      /*
27                       * 为了线程可见性,这里一定要加上同步,当然volatile也可,
28                       * 但下面方法还需要原子性,所以这里就直接使用了synchronized,
29                       * 但不是将isShutdown定义为volatile
30                       */
31                      isShutdown = true;
32               }
33               //向日志线程发出中断请求
34               loggerThread.interrupt();
35        }
36
37        //供应用程序调用,用来向日志缓存存放要记录的日志信息
38        public void log(String msg) throws InterruptedException {
39               synchronized (this) {
40                      /*
41                       * 如果应用程序发出了服务关闭请求,则不存在接受日志,而是直接
42                       * 抛出异常,让应用程序知道
43                       */
44                      if (isShutdown)
45                             throw new IllegalStateException(/*日志服务已关闭*/);
46                      /*
47                       * 由于queue是线程安全的阻塞队列,所以不需要同步(同步也可
48                       * 但并发效率会下降,所以将它放到了同步块外)。但是这里是的
49                       * 操作序列是由两个操作组成的:即先判断isShutdown,再向缓存
50                       * 中放入消息,如果将queue.put(msg)放在同步外,则在多线程环
51                       * 境中,LoggerThread中的  queue.size() == 0 将会不准确,所
52                       * 以又要想queue.put不同步,又要想queue.size()计算准确,所
53                       * 以就使用了一个变量reservations专用来记录缓存中日志条数,
54                       * 这样就即解决了同步queue效率低的问题,又解决了安全性问题,
55                       * 这真是两全其美
56                       */
57                      //queue.put(msg);
58                      ++reservations;//存储量加1
59               }
60               queue.put(msg);
61        }
62
63        private class LoggerThread extends Thread {
64               public void run() {
65                      try {
66                             while (true) {
67                                    try {
68                                           synchronized (LogService.this) {
69                                                  // 由于 queue 未同步,所以这里不能使用queue.size
70                                                  //if (isShutdown && queue.size() == 0)
71
72                                                  // 如果已关闭,且缓存中的日志信息都已写入,则退出日志线程
73                                                  if (isShutdown && reservations == 0)
74                                                         break;
75                                           }
76                                           String msg = queue.take();
77                                           synchronized (LogService.this) {
78                                                  --reservations;
79                                           }
80                                           writer.println(msg);
81                                    } catch (InterruptedException e) { /* 重试 */
82                                    }
83                             }
84                      } finally {
85                             writer.close();
86                      }
87               }
88        }
89 }

注意:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提提交消息的权利

2、关闭ExecutorService

  shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完

  shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法,无法保证能够停止正在处理的任务线程,但是会尽力尝试),并暂停处理正在等待的任务,并返回等待执行的任务列表。

  ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常

3、“毒丸”对象——当得到这个对象时,立即停止

  在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作

4、只执行一次的服务

  如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。

 1 boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
 2         throws InterruptedException {
 3 ExecutorService exec = Executors.newCachedThreadPool();
 4 //这里不能使用 volatile hasNewMail,因为还需要在匿名内中修改
 5     final AtomicBoolean hasNewMail = new AtomicBoolean(false);
 6     try {
 7         for (final String host : hosts)//循环检索每台主机
 8             exec.execute(new Runnable() {//执行任务
 9                 public void run() {
10                    if (checkMail(host))
11                        hasNewMail.set(true);
12                 }
13             });
14     } finally {
15         exec.shutdown();//因为ExecutorService只在这个方法中服务,所以完成后即可关闭
16         exec.awaitTermination(timeout, unit);//等待任务的完成,如果超时还未完成也会返回
17     }
18     return hasNewMail.get();
19 }

5、shutdown的局限性

我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查

 1 public class TrackingExecutor extends AbstractExecutorService {
 2     private final ExecutorService exec;
 3     private final Set<Runnable> tasksCancelledAtShutdown =
 4             Collections.synchronizedSet(new HashSet<Runnable>());
 5
 6     public TrackingExecutor(ExecutorService exec) {
 7         this.exec = exec;
 8     }
 9
10     public List<Runnable> getCancelledTasks() {//返回被取消的任务
11         if (!exec.isTerminated())//如果shutdownNow未调用或调用未完成时
12             throw new IllegalStateException(/*...*/);
13         return new ArrayList<Runnable>(tasksCancelledAtShutdown);
14     }
15
16     public void execute(final Runnable runnable) {
17         exec.execute(new Runnable() {
18             public void run() {
19                 try {
20                     runnable.run();
21                             /*参考:http://blog.csdn.net/coslay/article/details/48038795
22                              * 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
23                              * 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
24                              * 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
25                              * 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
26                              * 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
27                              * 将runnable.run()与下面的if放在一个同步块、而且还要将
28                              *  shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
29                              *  这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
30                              *  只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
31                              *  性能上的问题,因为什么所有的任务都会串形执行,这样还要
32                              *  ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
33                              *  这是“不可避免的竞争条件”
34                              */
35                 } finally {
36                                    //如果调用了shutdownNow且运行的任务被中断
37                     if (isShutdown()
38                             && Thread.currentThread().isInterrupted())
39                         tasksCancelledAtShutdown.add(runnable);//记录被取消的任务
40                 }
41             }
42         });
43 }
44 // 将ExecutorService 中的其他方法委托到exec
45 } 

三、处理非正常的线程终止

  在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中

  任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常

 1 //如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结
 2 public void run() {//工作者线程的实现
 3     Throwable thrown = null;
 4     try {
 5         while (!isInterrupted())
 6             runTask(getTaskFromWorkQueue());
 7     } catch (Throwable e) {//为了安全,捕获的所有异常
 8         thrown = e;//保留异常信息
 9     } finally {
10         threadExited(this, thrown);// 重新将异常抛给框架后终结工作线程
11     }
12 }

未捕获异常的线程

在Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况

在运行时间较长的应用程序中,通常会为所有的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

public class UEHLogger implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }
}

四、JVM关闭

JVM既可通过正常手段来关闭,也可强行关闭。

  • 正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时
  • 强行关闭:Runtime.halt,这种强行关闭方式将无法保证是否将运行关闭钩子

1、关闭钩子

  • 关闭钩子是指通过Runnable.addShutdownHook注册的但尚未开始的线程
  • JVM并不能保证关闭钩子的调用顺序
  • 当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器(finalize),然后再停止
  • JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子
  • 关闭钩子应该是线程安全的
  • 关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间
public void start()//通过注册关闭钩子,停止日志服务
{
    Runnable.getRuntime().addShutdownHook(new Thread(){
        public void run()
        {
            try{LogService.this.stop();}
            catch(InterruptedException ignored){}
        }
    });
}

2、守护线程——一个线程来执行一些辅助工作,但有不希望这个线程阻碍JVM的关闭

  线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程

  普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出

3、终结器(清理文件句柄或套接字句柄等)——避免使用

  垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而确保一些持久化的资源被释放。

  通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源

例外:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的

原文地址:https://www.cnblogs.com/linghu-java/p/9025111.html

时间: 2024-08-18 19:18:01

java并发编程实战:第七章----取消与关闭的相关文章

java并发编程实战学习笔记之取消与关闭

第七章 取消与关闭 7.1 任务取消 方式一.通过volatile类型的域来保存取消状态 方式二.interrupt()方法 interrupt()可以中断目标线程 isinterrupted()方法用来检测目标线程的中断状态 interrupted()用于清除中断状态,并且返回之前的中断状态,这是唯一可以清除中断状态的方法,如果在调用该方法是返回了true,那么除非你想屏蔽这个中断,否则你必须对他进行处理,可以抛出interruptExeption异常或者重新通过interrupt来恢复中断状

《Java并发编程实战》第二章 线程安全性 读书笔记

一.什么是线程安全性 编写线程安全的代码 核心在于要对状态访问操作进行管理. 共享,可变的状态的访问 - 前者表示多个线程访问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与其规范完全一致. 多个线程同时操作共享的变量,造成线程安全性问题. * 编写线程安全性代码的三种方法: 不在线程之间共享该状态变量 将状态变量修改为不可变的变量 在访问状态变量时使用同步 Java同步机制工具: synchronized volatile类型变量 显示锁(Explicit Lock

Java并发编程实战 第16章 Java内存模型

什么是内存模型 JMM(Java内存模型)规定了JVM必须遵循一组最小保证,这组保证规定了对变量的写入操作在何时将对其他线程可见. JMM为程序中所有的操作定义了一个偏序关系,称为Happens-Before.两个操作缺乏Happens-Before关系,则Jvm会对它们进行任意的重排序. Happends-Before的规则包括: 1. 程序顺序规则.若程序中操作A在操作B之前,则线程中操作A在操作B之前执行. 2. 监视器锁规则.在同一监视器锁上的解锁操作必须在加锁操作之前执行.如图所示,

[Java Concurrency in Practice]第七章 取消与关闭

取消与关闭 要使任务和线程能安全.快速.可靠地停止下来,并不是一件容易的事.Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用).但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作. 这种协作式的方法是必要的,我们很少希望某个任务.线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态.相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执

JAVA并发编程实战---第三章:对象的共享(2)

线程封闭 如果仅仅在单线程内访问数据,就不需要同步,这种技术被称为线程封闭,它是实现线程安全性的最简单的方式之一.当某个对象封闭在一个线程中时,这种方法将自动实现线程安全性,即使被封闭的对象本生不是线程安全的. 实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发.避免并发最简单的方法就是线程封闭.什么是线程封闭呢? 就是把对象封装到一个线程里,只有这一个线程能看到此对象.那么这个对象就算不是线程安全的也不会出现任何安全问题.实现线程封闭有哪些方法呢? 1:ad-hoc线程封闭 这是完全靠

Java并发编程实战 第15章 原子变量和非阻塞同步机制

非阻塞的同步机制 简单的说,那就是又要实现同步,又不使用锁. 与基于锁的方案相比,非阻塞算法的实现要麻烦的多,但是它的可伸缩性和活跃性上拥有巨大的优势. 实现非阻塞算法的常见方法就是使用volatile语义和原子变量. 硬件对并发的支持 原子变量的产生主要是处理器的支持,最重要的是大多数处理器架构都支持的CAS(比较并交换)指令. 模拟实现AtomicInteger的++操作 首先我们模拟处理器的CAS语法,之所以说模拟,是因为CAS在处理器中是原子操作直接支持的.不需要加锁. public s

《Java并发编程实战》第九章 图形用户界面应用程序界面 读书笔记

一.为什么GUI是单线程化 传统的GUI应用程序通常都是单线程的. 1. 在代码的各个位置都须要调用poll方法来获得输入事件(这样的方式将给代码带来极大的混乱) 2. 通过一个"主事件循环(Main Event Loop)"来间接地运行应用程序的全部代码. 假设在主事件循环中调用的代码须要非常长时间才干运行完毕,那么用户界面就会"冻结",直到代码运行完毕.这是由于仅仅有当运行控制权返回到主事件循环后,才干处理兴许的用户界面事件. 非常多尝试多线程的GUI框架的努力

JAVA并发编程实战---第三章:对象的共享

在没有同步的情况下,编译器.处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整.在缺乏足够同步的多线程程序中,要对内存操作的执行顺序进行判断几乎无法得到正确的结果. 非原子的64位操作 当线程在没有同步的情况下读取变量时,可能会读到一个失效值,但至少这个值是由之前的某个线程设置,而不是一个随机值.这种安全性保证也被称为最低安全性. Java内存模型要求:变量的读取操作和写入操作都必须是原子操作,但对于非Volatile类型的long和Double变量,JVM允许将64的读操作或写操作

java并发编程实战-第2章-线程安全性

2. 线程安全性 2.1 什么是线程安全性 线程安全类:当一个类被多个线程访问时,不管运行环境中如何调度,这些线程如何交替执行,并且在调用的代码部分不需要额为的同步或者协同.这个类为线程安全类 Thread-safe classes encapsulate any needed synchronization so that clients need not provide their own. 2.1.1. Example: A Stateless Servlet Stateless obje

《Java并发编程实战》第十六章 Java内存模型 读书笔记

Java内存模型是保障多线程安全的根基,这里仅仅是认识型的理解总结并未深入研究. 一.什么是内存模型,为什么需要它 Java内存模型(Java Memory Model)并发相关的安全发布,同步策略的规范.一致性等都来自于JMM. 1 平台的内存模型 在架构定义的内存模型中将告诉应用程序可以从内存系统中获得怎样的保证,此外还定义了一些特殊的指令(称为内存栅栏或栅栏),当需要共享数据时,这些指令就能实现额外的存储协调保证. JVM通过在适当的位置上插入内存栅栏来屏蔽在JVM与底层平台内存模型之间的