终止阻塞的线程

线程状态

我们知道,一个线程可以处于以下四种状态之一:

1. 新建(New):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。此刻线程已经有资格获取CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。

2. 就绪(Runnable):在这种状态下,只要调度器将CPU时间片分给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。

3. 阻塞(Blocked):线程能够运行,但有某个或多个条件阻止它运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间片。直到线程重新进入了就绪状态,它才有可能执行操作。

4. 死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以不被中断。

进入线程状态

而一个任务进入阻塞状态,可能由以下原因造成:

1. 通过调用sleep(milliseconds)方法使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

2. 通过调用wait()方法使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在JavaSE5的java.util.concurrent类库中等价的signal()活signalAll()消息),线程才会进入就绪状态。

3. 任务在等待某个I/O操作完成。

4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

在较早的代码中,也可能会看到用suspend()和resume()方法来阻塞和唤醒线程,但是在Java新版本中这些方法被废弃了,因为它们可能导致死锁。stop()方法也已经被废弃了,因为它不释放线程获得的锁,并且如果线程处于不一致的状态,其他任务可以在这种状态下浏览并修改它们。

现在我们需要查看的问题是:有事你希望能够终止处于阻塞状态的任务。如果对于阻塞装填的任务,你不能等待其到达代码中可以检查其状态值的某一点,因而决定让它主动终止,那么你就必须强制这个任务跳出阻塞状态。

中断

正如你所想象的,在Runnable.run()方法的中间打断它,与到达程序员准备好离开该方法的其他一些地方相比,要复杂得多。因为当你打断被阻塞的任务时,可能需要清理资源。正因为这一点,在任务的run()方法中间打断,更像是抛出的异常,因此在Java线程中的这种类型的异常中断中用到了异常。为了在以这种方式终止任务时返回良好的状态,你必须仔细考虑代码的执行路径,并仔细编写catch字句以便正确的清楚所有事物。

Thread类包含了interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。正如你将看到的,Thread.interrupted()提供了离开run()循环而不抛出异常的第二种方式。

为了调用interrupt(),你必须持有Thread对象。你可能已经注意到了,新的concurrent类库似乎在避免对Thread对象上的直接操作,转而尽量的通过Executor来执行所有操作。如果你在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的线程。这么做是有意义的,因为当你完成工程中的某个部分或者整个程序时,通常会希望同时关闭某个特定Executor的所有任务。然而,你有时也会希望只中断某个单一任务。如果使用Executor,那么通过调用submit()方法而不是execute()方法来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远都不会在其上调用get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定任务。如果你将true传递给cancel(),那么它就会拥有在该线程上调用interrupt()以停止这个线程的能力。因此,cancel是一种中断由Executor启动的单个线程的方式。

下面的示例使用Executor展示了基本的interrupt()用法:

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;
    public IOBlocked(InputStream is) {
        in = is;
    }
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from Blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            //永不释放获得的锁
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        //在构造的时候就获取该对象的锁
        new Thread(){
            @Override
            public void run() {
                f();
            }
        };
    }
    
    @Override
    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future<?> future = exec.submit(r);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Interrupting " + r.getClass().getName());
        future.cancel(true);//如果在运行的话,中断该线程。
        System.out.println("Interrupting sent to " + r.getClass().getName());
    }
    
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0);");
        //强行停止退出
        System.exit(0);
    }
}

执行结果:

Interrupting SleepBlocked
Interrupting sent to SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Waiting for read():
Interrupting IOBlocked
Interrupting sent to IOBlocked
Trying to call f()
Interrupting SynchronizedBlocked
Interrupting sent to SynchronizedBlocked
Aborting with System.exit(0);

上面的每个任务都表示了一种不同类型的阻塞。SleepBlock是可中断的阻塞示例,而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例。这个程序证明I/O和在synchronized块上的等待是不可中断的,但是通过浏览代码,你也可以预见到这一点——无论是I/O还是尝试调用synchronized方法,都不需要任何InterruptedException处理器。

两个雷很简单直观:在第一个类中run()方法调用了sleep(),在第二个类中调用了read()。但是为了掩饰SynchronizedBlock,我们必须首先获得锁。这是通过在构造器中创建匿名的Thread类的实例来实现的,这个匿名Thread类的对象通过调用f()获得了对象锁(这个线程必须有别于为启动SynchronizedBlock.run()的线程,因为同一个线程可以多次获得某个对象锁,你将在稍后看见)。由于f()永远都不反回,因此这个锁永远不会释放,而SynchronizedBlock.run()在试图调用f(),并阻塞以等待这个锁被释放。

从输出中可以看到,你能够中断对sleep()的调用(或者任何要求抛出InterruptedException的调用)。但是你不能中断正在试图获取synchronized锁或者正在试图执行I/O操作的线程。这有点令人烦恼,特别是在创建执行I/O任务时,因为这意味着I/O具有锁住你的多线程程序的潜在可能。特别是对于急于Web的程序,这更是关乎厉害。

对于这类问题,有一个略显笨拙但是确实行之有效的解决方案,那就是关闭任务在其上发生阻塞的资源

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream stream = new Socket("localhost", 8080).getInputStream();
        service.execute(new IOBlocked(stream));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        service.shutdownNow();//尝试停止所有正在执行的任务
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + stream.getClass().getName());
        stream.close();//通过关闭线程操作的资源来释放阻塞的线程
    }
}

执行结果:

Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from Blocked I/O
Exiting IOBlocked.run()

在shutdownNow()被调用之后以及在输入流上调用close()之前的延迟强调的是一旦底层资源被关闭,任务将解除阻塞。

幸运的是,各种NIO类提供了更人性化的I/O中断。被阻塞的nio通道会自动地响应中断

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class NIOBlocked implements Runnable {
    private final SocketChannel channel;
    
    public NIOBlocked(SocketChannel channel) {
        this.channel = channel;
    }
    
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            channel.read(ByteBuffer.allocate(1));//阻塞当前任务
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException");
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        Future<?> f = service.submit(new NIOBlocked(sc1));
        service.execute(new NIOBlocked(sc2));
        //尝试关闭任务,但由于任务处于阻塞状态,关闭不了。
        service.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // 通过在channel1上调用cancel来产生中断
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // 释放channel2
        sc2.close();
    }
}

执行结果:

Waiting for read() in [email protected]
Waiting for read() in [email protected]
ClosedByInterruptException
Exiting NIOBlocked.run() [email protected]
AsynchronousCloseException
Exiting NIOBlocked.run() [email protected]

如你所见,你还可以关闭底层资源以释放锁,尽管这种做法一般不是必须的。注意,使用execute()来启动两个任务,并调用service.shutdownNow()将可以很容易的终止所有事物,而对于捕获上面示例中的Future,只有在将中断发送给一个线程,同时不发送给另一个线程时才是必须的。

时间: 2024-10-19 03:20:21

终止阻塞的线程的相关文章

怎么在调用异步请求时不阻塞当前线程

在异步编程中,经常会调用已经写好的异步方法.这时会有一个需求:根据异步方法的返回值,做一些别的操作.这时会有两种实现方式: 1. 使用await,阻塞线程,等待异步方法的完成,然后获得返回值,再进行别的操作.示例: static void Main(string[] args) { Console.WriteLine("--------------程序运行开始-----------"); string url = "http://www.baidu.com"; va

可以“随时”终止的socket线程

很多人在刚学socket时,都是在线程中connect,然后while(flag) read();要停止这个线程时将falg置false,再wait,甚至直接termination(这种方式终止线程的安全隐患不在这里论述). 一般情况下的确没什么问题,但拿到一个真正的项目中时,就不太好了,这样写就很可能出现这样的情况:网络情况不好时,connect和read可能需要很长时间才能返回,将flag置false时,可能还需要等待很久(最大等待时间和平台有关),也许长达几分钟,特别是在GUI线程中去停止

Java阻塞队列线程集控制的实现

队列以一种先进先出的方式管理数据.如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞. 在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期的把中间结果存到阻塞队列中.而其他工作者线程把中间结果取出并在将来修改它们.队列会 自动平衡负载.如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞.如果第一个线程集运行的快,那么它将等待第二个线程集赶上来. 下面的程序展示了如何使用阻塞队列来控制线程集.程序在一个目录及它的所有

Callable,阻塞队列,线程池问题

一.说说Java创建多线程的方法 1. 通过继承Thread类实现run方法   2. 通过实现Runnable接口 3. 通过实现Callable接口 4. 通过线程池获取 二. 可以写一个Callable的案例吗?如何调用Callable接口 /*是一个带返回值的多线程类,如果需要有线程返回的结果,就需要使用此类*/ class MyThread implements Callable<Integer> { @Override public Integer call() { return

阻塞式线程

需求:有5个任务.前两个 同时执行, 后两个也同时执行 异步函数 + 并发队列 因为并发队列执行的时候,是没有顺序的,所以才会有前两个任务同时执行完毕之后在执行第三个耗时操作,想要确保第一个和第二个耗时操作执行完之后就执行第三个操作,这看似是需要串行执行,但是在没有串行的时候,就需要其他的办法,比如在第三条线程添加一个阻塞函数,把第三个线程添加到阻塞函数里面,这样的话,后面的任务就只有等第三个任务执行完之后再继续执行第四五个操作 #import "ViewController.h" @

Java中终止正在运行线程

问题:java 中如何让一个正在运行的线程终止掉? Demo_1: class TT implements Runnable { private boolean flag = true; @Override public void run() { int i = 0; while(flag) { System.out.println("child thread: "+i++); } } public void shutDowm() { flag = false; } } class T

ReentrantLock是如何阻塞等待线程的?

1 public class Test_Lock { 2 static ReentrantLock lock = new ReentrantLock(); 3 static class Runner implements Runnable { 4 @Override 5 public void run() { 6 lock.lock(); 7 System.out.println(Thread.currentThread().getName()); 8 lock.unlock(); 9 } 10

支持生产阻塞的线程池

在生产 - 消费者问题中 newFixedThreadPool的构造参数里的nThreads是最大同时工作的线程数,如果工作线程已满,新提交的任务会被放到一个无界的LinkedBlockingQueue里(等待队列) 如果生产速度大于消费速度,那么会发生任务堆积,等待队列会扩展到内存耗尽 naive的想法是,自定义线程池,将等待队列设置为有界的BlockingQueue,那么新提交的任务会被block住,直到工作线程空出来为止 但是如果去看J.U.C的源码,ThreadPoolExecutor.

线程的终止stop与线程的中断interrupted

线程除了运行完毕,自动进入死亡状态,也可以手动进行停止,Thread类也提供了2个类方法来进行线程的停止. 一.stop 如图所示,stop方法已经被标记为过时的,不推荐的.因为这这个方法太过于暴力,会立即杀死进程,导致数据不能同步,带来很难排查的错误. 下面是一段造成错误信息的代码: 1 public class StopThreadUnsafe { 2 public static User u = new User(); 3 4 public static class User { 5 pr