Java线程阻塞中断和LockSupport的常见问题

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

在介绍之前,先抛几个问题。

  1. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
  2. Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
  3. 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
  4. LockSupport.park()和unpark(),与object.wait()和notify()的区别?
  5. LockSupport.park(Object blocker)传递的blocker对象做什么用?
  6. LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
  7. Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?

如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。

那如果不清楚的,带着这几个问题,一起来梳理下。

Thread的interrupt处理的几个方法:

  • public void interrupt() :  执行线程interrupt事件
  • public boolean isInterrupted() : 检查当前线程是否处于interrupt
  • public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()

理解:

1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态

2. 一般调用Thread.interrupt()会有两种处理方式

  • 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
  • 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。

在interrupt javadoc中描述:

最佳实践

IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。

  1. Don‘t swallow interrupts (别吃掉Interrupt,一般是两种处理:  继续throw InterruptedException异常。  另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。

    Java代码  

    1. public class TaskRunner implements Runnable {
    2. private BlockingQueue<Task> queue;
    3. public TaskRunner(BlockingQueue<Task> queue) {
    4. this.queue = queue;
    5. }
    6. public void run() {
    7. try {
    8. while (true) {
    9. Task task = queue.take(10, TimeUnit.SECONDS);
    10. task.execute();
    11. }
    12. }
    13. catch (InterruptedException e) {
    14. // Restore the interrupted status
    15. Thread.currentThread().interrupt();
    16. }
    17. }
    18. }
  2. Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)

    Java代码  

    1. public class PrimeProducer extends Thread {
    2. private final BlockingQueue<BigInteger> queue;
    3. PrimeProducer(BlockingQueue<BigInteger> queue) {
    4. this.queue = queue;
    5. }
    6. public void run() {
    7. try {
    8. BigInteger p = BigInteger.ONE;
    9. while (!Thread.currentThread().isInterrupted())
    10. queue.put(p = p.nextProbablePrime());
    11. } catch (InterruptedException consumed) {
    12. /* Allow thread to exit */
    13. }
    14. }
    15. public void cancel() { interrupt(); } // 发起中断
    16. }<span style="white-space: normal;"> </span>

注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。

Java代码  

  1. interface InterruptAble { // 定义可中断的接口
  2. public void interrupt() throws InterruptedException;
  3. }
  4. abstract class InterruptSupport implements InterruptAble {
  5. private volatile boolean interrupted = false;
  6. private Interruptible    interruptor = new Interruptible() {
  7. public void interrupt() {
  8. interrupted = true;
  9. InterruptSupport.this.interrupt(); // 位置3
  10. }
  11. };
  12. public final boolean execute() throws InterruptedException {
  13. try {
  14. blockedOn(interruptor); // 位置1
  15. if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
  16. interruptor.interrupt();
  17. }
  18. // 执行业务代码
  19. bussiness();
  20. } finally {
  21. blockedOn(null);   // 位置2
  22. }
  23. return interrupted;
  24. }
  25. public abstract void bussiness() ;
  26. public abstract void interrupt();
  27. // -- sun.misc.SharedSecrets --
  28. static void blockedOn(Interruptible intr) { // package-private
  29. sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
  30. }
  31. }

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

使用:

Java代码  

  1. class InterruptRead extends InterruptSupport {
  2. private FileInputStream in;
  3. @Override
  4. public void bussiness() {
  5. File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
  6. try {
  7. in = new FileInputStream(file);
  8. byte[] bytes = new byte[1024];
  9. while (in.read(bytes, 0, 1024) > 0) {
  10. // Thread.sleep(100);
  11. // if (Thread.interrupted()) {// 以前的Interrupt检查方式
  12. // throw new InterruptedException("");
  13. // }
  14. }
  15. } catch (Exception e) {
  16. throw new RuntimeException(e);
  17. }
  18. }
  19. public FileInputStream getIn() {
  20. return in;
  21. }
  22. @Override
  23. public void interrupt() {
  24. try {
  25. in.getChannel().close();
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. public static void main(String args[]) throws Exception {
  32. final InterruptRead test = new InterruptRead();
  33. Thread t = new Thread() {
  34. @Override
  35. public void run() {
  36. long start = System.currentTimeMillis();
  37. try {
  38. System.out.println("InterruptRead start!");
  39. test.execute();
  40. } catch (InterruptedException e) {
  41. System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
  42. e.printStackTrace();
  43. }
  44. }
  45. };
  46. t.start();
  47. // 先让Read执行3秒
  48. Thread.sleep(3000);
  49. // 发出interrupt中断
  50. t.interrupt();
  51. }

jdk源码介绍:

1. sun提供的钩子可以查看System的相关代码, line : 1125

System代码  

  1. sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
  2. public sun.reflect.ConstantPool getConstantPool(Class klass) {
  3. return klass.getConstantPool();
  4. }
  5. public void setAnnotationType(Class klass, AnnotationType type) {
  6. klass.setAnnotationType(type);
  7. }
  8. public AnnotationType getAnnotationType(Class klass) {
  9. return klass.getAnnotationType();
  10. }
  11. public <E extends Enum<E>>
  12. E[] getEnumConstantsShared(Class<E> klass) {
  13. return klass.getEnumConstantsShared();
  14. }
  15. public void blockedOn(Thread t, Interruptible b) {
  16. t.blockedOn(b);
  17. }
  18. });

2. Thread.interrupt()

Java代码  

  1. public void interrupt() {
  2. if (this != Thread.currentThread())
  3. checkAccess();
  4. synchronized (blockerLock) {
  5. Interruptible b = blocker;
  6. if (b != null) {
  7. interrupt0();       // Just to set the interrupt flag
  8. b.interrupt(); //回调钩子
  9. return;
  10. }
  11. }
  12. interrupt0();
  13. }

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

最后来解答一下之前的几个问题:

问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?

答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。

Java代码  

  1. if (Thread.interrupted())  // Clears interrupted status!
  2. throw new InterruptedException();

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1.  面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2.  实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.

Java代码  

  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
  4. unsafe.park(false, 0L);
  5. setBlocker(t, null);  // 清除Thread.parkBlocker属性的值
  6. }

具体LockSupport的javadoc描述也比较清楚,可以看下:

问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:

相关测试代码

Java代码  

  1. package com.agapple.cocurrent;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.lang.reflect.Field;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.LockSupport;
  7. public class LockSupportTest {
  8. private static LockSupportTest blocker = new LockSupportTest();
  9. public static void main(String args[]) throws Exception {
  10. lockSupportTest();
  11. parkTest();
  12. interruptParkTest();
  13. interruptSleepTest();
  14. interruptWaitTest();
  15. }
  16. /**
  17. * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
  18. *
  19. * @throws Exception
  20. */
  21. private static void lockSupportTest() throws Exception {
  22. Thread t = doTest(new TestCallBack() {
  23. @Override
  24. public void callback() throws Exception {
  25. // 尝试sleep 5s
  26. System.out.println("blocker");
  27. LockSupport.park(blocker);
  28. System.out.println("wakeup now!");
  29. }
  30. @Override
  31. public String getName() {
  32. return "lockSupportTest";
  33. }
  34. });
  35. t.start(); // 启动读取线程
  36. Thread.sleep(150);
  37. synchronized (blocker) {
  38. Field field = Thread.class.getDeclaredField("parkBlocker");
  39. field.setAccessible(true);
  40. Object fBlocker = field.get(t);
  41. System.out.println(blocker == fBlocker);
  42. Thread.sleep(100);
  43. System.out.println("notifyAll");
  44. blocker.notifyAll();
  45. }
  46. }
  47. /**
  48. * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
  49. *
  50. * @throws InterruptedException
  51. */
  52. private static void interruptWaitTest() throws InterruptedException {
  53. final Object obj = new Object();
  54. Thread t = doTest(new TestCallBack() {
  55. @Override
  56. public void callback() throws Exception {
  57. // 尝试sleep 5s
  58. obj.wait();
  59. System.out.println("wakeup now!");
  60. }
  61. @Override
  62. public String getName() {
  63. return "interruptWaitTest";
  64. }
  65. });
  66. t.start(); // 启动读取线程
  67. Thread.sleep(2000);
  68. t.interrupt(); // 检查下在park时,是否响应中断
  69. }
  70. /**
  71. * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
  72. *
  73. * @throws InterruptedException
  74. */
  75. private static void interruptSleepTest() throws InterruptedException {
  76. Thread t = doTest(new TestCallBack() {
  77. @Override
  78. public void callback() throws Exception {
  79. // 尝试sleep 5s
  80. Thread.sleep(5000);
  81. System.out.println("wakeup now!");
  82. }
  83. @Override
  84. public String getName() {
  85. return "interruptSleepTest";
  86. }
  87. });
  88. t.start(); // 启动读取线程
  89. Thread.sleep(2000);
  90. t.interrupt(); // 检查下在park时,是否响应中断
  91. }
  92. /**
  93. * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
  94. *
  95. * @throws InterruptedException
  96. */
  97. private static void interruptParkTest() throws InterruptedException {
  98. Thread t = doTest(new TestCallBack() {
  99. @Override
  100. public void callback() {
  101. // 尝试去park 自己线程
  102. LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
  103. System.out.println("wakeup now!");
  104. }
  105. @Override
  106. public String getName() {
  107. return "interruptParkTest";
  108. }
  109. });
  110. t.start(); // 启动读取线程
  111. Thread.sleep(2000);
  112. t.interrupt(); // 检查下在park时,是否响应中断
  113. }
  114. /**
  115. * 尝试去中断一个LockSupport.unPark(),会有响应
  116. *
  117. * @throws InterruptedException
  118. */
  119. private static void parkTest() throws InterruptedException {
  120. Thread t = doTest(new TestCallBack() {
  121. @Override
  122. public void callback() {
  123. // 尝试去park 自己线程
  124. LockSupport.park(blocker);
  125. System.out.println("wakeup now!");
  126. }
  127. @Override
  128. public String getName() {
  129. return "parkTest";
  130. }
  131. });
  132. t.start(); // 启动读取线程
  133. Thread.sleep(2000);
  134. LockSupport.unpark(t);
  135. t.interrupt();
  136. }
  137. public static Thread doTest(final TestCallBack call) {
  138. return new Thread() {
  139. @Override
  140. public void run() {
  141. File file = new File("/dev/urandom"); // 读取linux黑洞
  142. try {
  143. FileInputStream in = new FileInputStream(file);
  144. byte[] bytes = new byte[1024];
  145. while (in.read(bytes, 0, 1024) > 0) {
  146. if (Thread.interrupted()) {
  147. throw new InterruptedException("");
  148. }
  149. System.out.println(bytes[0]);
  150. Thread.sleep(100);
  151. long start = System.currentTimeMillis();
  152. call.callback();
  153. System.out.println(call.getName() + " callback finish cost : "
  154. + (System.currentTimeMillis() - start));
  155. }
  156. } catch (Exception e) {
  157. e.printStackTrace();
  158. }
  159. }
  160. };
  161. }
  162. }
  163. interface TestCallBack {
  164. public void callback() throws Exception;
  165. public String getName();
  166. }

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

本文仅当抛砖引玉,欢迎发言!

原文地址:http://agapple.iteye.com/blog/970055;

Java线程阻塞中断和LockSupport的常见问题,布布扣,bubuko.com

时间: 2024-10-06 01:24:10

Java线程阻塞中断和LockSupport的常见问题的相关文章

java线程阻塞中断与LockSupport使用介绍

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现. 在介绍之前,先抛几个问题. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程需要关注

挂起(suspend)与线程阻塞工具类LockSupport

挂起(suspend)与线程阻塞工具类LockSupport 一般来说是不推荐使用suspend去挂起线程的,因为suspend在导致线程暂停的同时,并不会去释放任何锁资源. 如果其他任何线程想要访问被它暂用的锁时,都会被牵连,导致无法正常继续运行. 直到对应的线程上进行了resume操作. 并且,如果resume操作意外的在suspend前执行了,那么被挂起的线程可能很难有机会被继续执行,更严重的是:它所占用的锁不会被释放,因此可能会导致整个系统工作不正常,而且,对于被挂起的线程,从它的线程状

Java多线程——线程阻塞工具类LockSupport

简述 LockSupport 是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞. 和 Thread.suspend()相比,它弥补了由于 resume()在前发生,导致线程无法继续执行的情况. 和 Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出 InterruptedException 异常. LockSupport 的静态方法 park()可以阻塞当前线程,类似的还有 parkNanos().parkUntil()等方法.它们实现了一个限时等待,如下图

线程阻塞工具类:LockSupport(读书笔记)

他可以在线程任意位置让线程阻塞, LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos() ParkUntil()等,他们实现了一个限时等待 public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThrea

多线程之Java线程阻塞与唤醒

线程的阻塞和唤醒在多线程并发过程中是一个关键点,当线程数量达到很大的数量级时,并发可能带来很多隐蔽的问题.如何正确暂停一个线程,暂停后又如何在一个要求的时间点恢复,这些都需要仔细考虑的细节.在Java发展史上曾经使用suspend().resume()方法对于线程进行阻塞唤醒,但随之出现很多问题,比较典型的还是死锁问题.如下代码,主要的逻辑代码是主线程启动线程mt一段时间后尝试使用suspend()让线程挂起,最后使用resume()恢复线程.但现实并不如愿,执行到suspend()时将一直卡住

Java线程的中断(Interruption)

任务和线程的启动很容易.在大多数时候,我们都会让它们运行直到结束,或者让它们自行停止.然而,有时候我们希望提前结束任务或线程,或许是因为用户取消了操作,或者应用程序需要被快速关闭. 要使任务和线程能安仝.快速.可靠地停止下来,并不是一件容易的事.Java的Thread类为我们提供了stop(),suspend()等停止挂起线程的方法,但是由于安全问题目前都已被弃用.Java并没有提供一种安全的抢占式方法来停止线程,但它提供了中断(Interruption),这是一种协作机制,采用协作式的方式使一

java线程阻塞问题排查

我开发的worker,每隔几个月线上都会阻塞一次,一直都没查出问题.今天终于了了这个心结.把解决过程总结下和大家分享. 首先用jstack命令打出这个进程的全部线程堆栈.拿到线程dump文件之后,搜索自己的worker名字. "DefaultQuartzScheduler_Worker-10" prio=10 tid=0x00007f55cd54d800 nid=0x3e2e waiting for monitor entry [0x00007f51ab8f7000] java.lan

java线程的中断学习

<span style="font-size:18px;">package thread.java.test; /** * 在这里练习的是线程的中断 * Thread.interrupt()来设置中断状态是true,当一个线程运行时,另一个线程可以调用另一个 * 线程的interrupt()方法来中断他 * Thread.isInterrupt()来获取线程的中断状态 * Thread.interrupted()这是一个静态的方法,用来获取中断状态,并清除中断状态, * 其

Java阻塞中断和LockSupport

在介绍之前,先抛几个问题. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么? LockSupport.park()和unpark(),与object.wait()和notify()的区