系统要实现某个全局功能必定要需要各个子模块之间的协调和配合,就像一个团队要完成某项任务的时候需要团队各个成员之间密切配合一样。而对于系统中的各个子线程来说,如果要完成一个系统功能,同样需要各个线程的配合,这样就少不了线程之间的通信与协作。常见的线程之间通信方式有如下几种:
1、wait和notify/notifyAll
2、await和signal/signalAll
3、sleep/yield/join
4、CyclicBarrier 栅栏
5、CountDownLatch 闭锁
6、Semaphore 信号量
一、wait和notify/notifyAll
在使用之前先明确 :
wait和notify是Object的方法,任何一个对象都具有该方法。在使用的时候,首先需要设置一个全局锁对象,通过对该锁的释放和持有来控制该线程的运行和等待。因此在调用wait和notify的时候,该线程必须要已经持有该锁,然后才可调用,否则将会抛出IllegalMonitorStateException异常。
确定要让哪个线程等待?让哪个线程等待就在哪个线程中调用锁对象的wait方法。调用wait等待的是当前线程,而不是被调用线程,并不是theread.wait()就可以让thread等待,而是让当前线程(调用wait方法的线程,不是调用者)进行等待。尽量不要把线程对象当做全局锁使用,以免混淆等待线程。
看一下使用方法:(代码中省略了main方法,对sleep()和println()方法进行了封装)
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; /** * Created by PerkinsZhu on 2017/8/21 10:25. */ public class TestWaitAndNotify { public static void main(String[] args) { TestWaitAndNotify test = new TestWaitAndNotify(); test.testWait(); } Object obj = new Object();//创建一个全局变量,用来协调各个线程 ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>();//设置一个线程wait和notify的触发条件 class MyRunner implements Runnable { @Override public void run() { num.set(new AtomicInteger(0)); while (true) { Console.println(Thread.currentThread().getName()); if (num.get().getAndIncrement() == 1) { synchronized (obj) {//如果要想调用wait方法,则必须持有该对象。否则将会抛出IllegalMonitorStateException try { Console.println(Thread.currentThread().getName() + "挂起等待"); obj.wait();//同一个线程可以wait多次,多个线程也可以使用同一个obj调用wait Console.println(Thread.currentThread().getName() + "唤醒!!!"); } catch (InterruptedException e) { e.printStackTrace(); } } } sleep(1000); } } } private void testWait() { MyRunner runner = new MyRunner(); new Thread(runner).start(); new Thread(runner).start(); AtomicInteger num03 = new AtomicInteger(0); Thread th03 = new Thread(new Runnable() { @Override public void run() { while (true) { synchronized (obj) {//调用notify/notifyAll和wait一样,同样需要持有该对象 if (num03.getAndIncrement() == 5) { obj.notify();//唤醒最先一个挂在obj上面的线程.每次只唤醒一个。这里是按照等待的先后顺序进行唤醒 } } sleep(1000); } } }); th03.start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果如下:
Thread-1Thread-0Thread-1Thread-0Thread-1挂起等待Thread-0挂起等待Thread-1唤醒!!!Thread-1Thread-1Thread-1
从执行结果只中可以看出,在执行两次输出之后,两个线程被分别挂起等待。过一会之后线程1被成功唤醒。这里之所以唤醒的是Thread-1是因为Thread-1是第一个先挂起的,所以在notify()方法在唤起wait线程的时候也是公平的,依照wait的挂起顺序来执行唤醒。
在使用wait的时候,同一个obj可以被多个线程调用obj.wait(),也可以被同一个线程执行多次obj.wait();
例如,修改try catch代码代码块
Console.println(Thread.currentThread().getName() + "挂起等待"); obj.wait();//执行多次wait操作 obj.wait(); obj.wait(); Console.println(Thread.currentThread().getName() + "唤醒!!!");
然后只启动一个线程
new Thread(runner,"thread--01").start(); // new Thread(runner,"thread--02").start();
执行结果如下:
thread--01 thread--01 thread--01挂起等待
线程一直停滞在此处,无法继续执行,这是因为线程调用了三此wait,而如果要想成功唤醒线程,则同样需要调用三次notify或者调用一次notifyAll()。这里就不再列出代码。
wait方法有两个重载方法:
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException ;
两个方法都是wait指定时间之后,如果依旧没有被其它线程唤醒或者被中断则会自动停止wait。其中第二个方法指定了时间的单位。
public final void wait(long timeout, int nanos) throws InterruptedException Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed. This method is similar to the wait method of one argument, but it allows finer control over the amount of time to wait for a notification before giving up. The amount of real time, measured in nanoseconds, is given by: 1000000*timeout+nanos In all other respects, this method does the same thing as the method wait(long) of one argument. In particular, wait(0, 0) means the same thing as wait(0). The current thread must own this object‘s monitor. The thread releases ownership of this monitor and waits until either of the following two conditions has occurred: Another thread notifies threads waiting on this object‘s monitor to wake up either through a call to the notify method or the notifyAll method. The timeout period, specified by timeout milliseconds plus nanos nanoseconds arguments, has elapsed. The thread then waits until it can re-obtain ownership of the monitor and resumes execution. As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop: synchronized (obj) { while (<condition does not hold>) obj.wait(timeout, nanos); ... // Perform action appropriate to condition } This method should only be called by a thread that is the owner of this object‘s monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.
注意这里的synchronized的目的不是加锁控制线程的串行,而是为了持有锁来调用wait和notify对象。
在理解这线程调用obj.wait()的时候可以理解为"挂在obj对象上的线程",而对于线程调用obj.notify()可以理解为"唤起最后一个挂在obj上面的那个线程",而对于线程调用obj.notifyAll(),则可以理解为"唤起所有挂在obj对象上的线程"。obj对象在这里起的作用就是一个信息载体中介。各个线程通过这个中介进行通行协作,控制线程之间的暂停和执行。
二、await和signal/signalAll
await和signal是Condition的两个方法,其作用和wait和notify一样,目的都是让线程挂起等待,不同的是,这两种方法是属于Condition的两个方法,而Condition对象是由ReentrantLock调用newCondition()方法得到的。Condition对象就相当于前面所说的中介,在线程中调用contiton.await()和condition.signal()可以分别使线程等待和唤醒。
如需要了解tryLock的使用可以看这里:多线程(五) java的线程锁
使用示例:
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by PerkinsZhu on 2017/8/21 11:59. */ public class TestCondition { public static void main(String[] args) { TestCondition test = new TestCondition(); test.testWait(); } ReentrantLock lock = new ReentrantLock(); ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>(); Condition condition = lock.newCondition(); private void testWait() { new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 5) { Console.println("signal---!!!"); try { lock.lock(); condition.signal(); } finally { lock.unlock(); } } Console.println("thread ---- 01"); sleep(1000); } } }).start(); new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 2) { try { //lock.tryLock(); //lock.tryLock(5000, TimeUnit.MILLISECONDS); lock.lock();//这里同样要加锁,否则会抛出IllegalMonitorStateException异常。注意的是这里不要使用synchronized进行加锁,而是使用lock condition.await();//注意这里不要调用wait!!! } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } Console.println("thread ---- 02"); sleep(1000); } } }).start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
在使用Condition的时候和Synchronized没有太大的区别,只是调用的方法变为await和signal。需要注意的是这里加锁不再使用synchronized()进行加锁,而是使用lock和unlock进行加锁。
执行结果如下:
thread ---- 01 thread ---- 02 thread ---- 01 thread ---- 01 thread ---- 01 signal---!!! thread ---- 01 thread ---- 02 thread ---- 02 thread ---- 01
三、sleep/yield/join
对于sleep()方法应该很熟悉了,让当前线程睡眠一段时间。期间不会释放任何持有的锁。
public static native void sleep(long millis) throws InterruptedException;
1 /** 2 * Causes the currently executing thread to sleep (temporarily cease 3 * execution) for the specified number of milliseconds, subject to 4 * the precision and accuracy of system timers and schedulers. The thread 5 * does not lose ownership of any monitors. 6 * 7 * @param millis 8 * the length of time to sleep in milliseconds 9 * 10 * @throws IllegalArgumentException 11 * if the value of {@code millis} is negative 12 * 13 * @throws InterruptedException 14 * if any thread has interrupted the current thread. The 15 * <i>interrupted status</i> of the current thread is 16 * cleared when this exception is thrown. 17 */
对于yield()方法可能使用的情况少一下。其作用主要是让当前线程从运行状态转变为就绪状态,由线程调度重新选择就绪状态的线程分配CPU资源。至于最终会选取哪个线程分配CPU资源就由调度策略来决定了,有可能还是该线程,有可能换为其它线程。
1 /** 2 * A hint to the scheduler that the current thread is willing to yield 3 * its current use of a processor. The scheduler is free to ignore this 4 * hint. 5 * 6 * <p> Yield is a heuristic attempt to improve relative progression 7 * between threads that would otherwise over-utilise a CPU. Its use 8 * should be combined with detailed profiling and benchmarking to 9 * ensure that it actually has the desired effect. 10 * 11 * <p> It is rarely appropriate to use this method. It may be useful 12 * for debugging or testing purposes, where it may help to reproduce 13 * bugs due to race conditions. It may also be useful when designing 14 * concurrency control constructs such as the ones in the 15 * {@link java.util.concurrent.locks} package. 16 */ 17 public static native void yield();
对于join方法,作用是暂停当前线程,等待被调用线程指向结束之后再继续执行。
1 /** 2 * Waits for this thread to die. 3 * 4 * <p> An invocation of this method behaves in exactly the same 5 * way as the invocation 6 * 7 * <blockquote> 8 * {@linkplain #join(long) join}{@code (0)} 9 * </blockquote> 10 * 11 * @throws InterruptedException 12 * if any thread has interrupted the current thread. The 13 * <i>interrupted status</i> of the current thread is 14 * cleared when this exception is thrown. 15 */ 16 public final void join() throws InterruptedException;
使用join的时候需要注意:
1、调用join的时候,当前线程不会释放掉锁,如果调用线程也需要该锁则就会导致死锁!
2、join方法不会启动调用线程,所以,在调用join之前,该调用线程必须已经start启动,否则不会达到想要的效果。
join的底层实际是就是使用了一个自旋等待机制,判断调用线程是否死亡,如果没有则一直让当前线程wait。可以看一下底层实现源码:
1 public final synchronized void join(long millis) throws InterruptedException { 2 long base = System.currentTimeMillis(); 3 long now = 0; 4 if (millis < 0) { 5 throw new IllegalArgumentException("timeout value is negative"); 6 } 7 if (millis == 0) { 8 while (isAlive()) {//如果调用者依旧没有结束,让当前线程进行等待 9 wait(0);//注意这里的wait是等待的当前线程,而不是调用者线程 10 } 11 } else { 12 while (isAlive()) { 13 long delay = millis - now; 14 if (delay <= 0) { 15 break; 16 } 17 wait(delay);//指定等待的时间 18 now = System.currentTimeMillis() - base; 19 } 20 } 21 }
四、CyclicBarrier栅栏
CyclicBarrier字面理解为线程屏障,当指定数量的线程执行到指定位置的时候,才能触发后续动作的进行。其最终目的是让所有线程同时开始后续的工作。
例如:三个员工来公司开会,由于三人住的地方与公司距离不同,所以到会议室的时间也不同。而会议开始必须等待三者都到达会议室之后才能进行。
代码如下:
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CyclicBarrier; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 /** 17 * 注意这里等待的是三个线程。这就相当于一个线程计数器,当指定个数的线程执行 barrier.await();方法之后,才会执行后续的代码,否则每个线程都会一直进行等待。 18 * 如果把3修改为4,则将永远等待下去,不会开始会议。 19 * 如果把3修改为2,则小张到达之后就会提前开始会议,不会继续等待小王。 20 */ 21 CyclicBarrier barrier = new CyclicBarrier(3); 22 23 Thread 小李 = new Thread(new MyRunner(barrier, "小李", 2000)); 24 小李.start(); 25 Thread 小张 = new Thread(new MyRunner(barrier, "小张", 4000)); 26 小张.start(); 27 Thread 小王 = new Thread(new MyRunner(barrier, "小王", 5000)); 28 小王.start(); 29 } 30 31 static class MyRunner implements Runnable { 32 CyclicBarrier barrier; 33 String name; 34 int time; 35 36 public MyRunner(CyclicBarrier barrier, String name, int time) { 37 this.barrier = barrier; 38 this.name = name; 39 this.time = time; 40 } 41 42 @Override 43 public void run() { 44 Console.println(name + " 开始出发去公司。"); 45 sleep(time); 46 Console.println(name + " 终于到会议室!!!"); 47 try { 48 barrier.await(); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 startMeeting(name); 53 } 54 } 55 56 private static void startMeeting(String name) { 57 Console.println(name + "说:人齐了。会议开始!!"); 58 } 59 60 private static void sleep(int time) { 61 try { 62 Thread.sleep(time); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 }
运行结果:
1 小李 开始出发去公司。 2 小王 开始出发去公司。 3 小张 开始出发去公司。 4 小李 终于到会议室!!! 5 小张 终于到会议室!!! 6 小王 终于到会议室!!! 7 小王说:人齐了。会议开始!! 8 小李说:人齐了。会议开始!! 9 小张说:人齐了。会议开始!!
在使用CyclicBarrier的时候,提供了一个重载的构造器。
public CyclicBarrier(int parties, Runnable barrierAction) {}
barrierAction会在一组线程中的最后一个线程到达之后(但在释放所有线程之前)
触发。例如修改上面的代码21行为:
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { Console.println("======"); } });
运行结果:
小张 开始出发去公司。 小李 开始出发去公司。 小王 开始出发去公司。 小李 终于到会议室!!! 小张 终于到会议室!!! 小王 终于到会议室!!! ====== 小王说:人齐了。会议开始!! 小李说:人齐了。会议开始!! 小张说:人齐了。会议开始!!
五、CountDownLatch闭锁
与CycliBarrier不同的是CountDownLatch是某一个线程等待其他线程执行到某一位置之后,该线程(调用countDownLatch.await();等待的线程)才会继续后续工作。而CycliBarrier是各个线程执行到某位置之后,然后所有线程一齐开始后续的工作。相同的是两者都属于线程计数器。
使用示例如下: boss等待所有员工来开会,当所有人员都到齐之后,boss宣布开始会议!!!
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CountDownLatch; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 17 CountDownLatch countDownLatch = new CountDownLatch(3);//注意这里的参数指定了等待的线程数量 18 19 new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); 20 new Thread(new MyRunner(countDownLatch, "小张", 4000)).start(); 21 new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); 22 23 try { 24 Console.println("等待员工到来开会。。。。。。。"); 25 countDownLatch.await();//注意这里是await。主线程将会一直等待在这里,当所有线程都执行 countDownLatch.countDown();之后当前线程才会继续执行 26 startMeeting("Boss"); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 static class MyRunner implements Runnable { 33 CountDownLatch countDownLatch; 34 String name; 35 int time; 36 37 public MyRunner(CountDownLatch countDownLatch, String name, int time) { 38 this.countDownLatch = countDownLatch; 39 this.name = name; 40 this.time = time; 41 } 42 43 @Override 44 public void run() { 45 Console.println(name + " 开始出发去公司。"); 46 sleep(time); 47 Console.println(name + " 终于到会议室!!!"); 48 countDownLatch.countDown(); Console.println(name + " 准备好了!!"); 49 } 50 } 51 52 private static void startMeeting(String name) { 53 Console.println(name + "说:人齐了。会议开始!!"); 54 } 55 56 private static void sleep(int time) { 57 try { 58 Thread.sleep(time); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 } 63 }
执行结果如下:
等待员工到来开会。。。。。。。小王 开始出发去公司。小张 开始出发去公司。小李 开始出发去公司。小李 终于到会议室!!!小李 准备好了!!小张 终于到会议室!!!小张 准备好了!!小王 终于到会议室!!!小王 准备好了!!Boss说:人齐了。会议开始!!
注意区分是某一个线程等待其他线程还是所有线程在达到某一条件之后一起执行!!!
6、Semaphore 信号量
Semaphore在线程协作方面主要用于控制同时访问临界区资源的线程个数。信号量是属于操作系统层面的概念,jdk提供了操作接口。
使用示例如下:
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Semaphore; 8 9 /** 10 * Created by PerkinsZhu on 2017/8/30 11:43. 11 */ 12 public class TestSemaphore { 13 public static void main(String[] args) { 14 testSemaphore(); 15 } 16 17 private static void testSemaphore() { 18 Semaphore semaphore = new Semaphore(2, true);//指定同时访问临界区资源的线程数量。第二个参数指定以公平方式访问临界区资源 19 ExecutorService excutorService = Executors.newFixedThreadPool(10); 20 for (int i = 0; i < 6; i++) {//启动10个线程请求资源 21 excutorService.execute(new MyRunner(semaphore)); 22 sleep(0);//逐个启动线程 23 } 24 excutorService.shutdown(); 25 } 26 27 static class MyRunner implements Runnable { 28 Semaphore semaphore; 29 30 public MyRunner(Semaphore semaphore) { 31 this.semaphore = semaphore; 32 } 33 34 @Override 35 public void run() { 36 String name = Thread.currentThread().getName(); 37 try { 38 Console.println(name + " ------请求资源!!"); 39 //semaphore.acquire(2);//设置请求资源的数量。必须有足够数量的资源才可进去临界区。不过释放的时候也要一起释放,请求几个就要调用几次release() 40 semaphore.acquire();//请求获取资源,如果有空闲资源则会立即获取,进入临界区,否则将会等待,一直等待到获取到临界区资源 41 Console.println(name + " ======获取资源!!"); 42 sleep(1000); 43 //semaphore.release(); 44 semaphore.release();//释放资源 45 Console.println(name + " ******释放资源!!"); 46 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 } 52 53 private static void sleep(int time) { 54 try { 55 Thread.sleep(time); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 } 60 }
执行结果如下:
pool-1-thread-1 ------请求资源!! pool-1-thread-2 ------请求资源!! pool-1-thread-6 ------请求资源!! pool-1-thread-5 ------请求资源!! pool-1-thread-3 ------请求资源!! pool-1-thread-4 ------请求资源!! pool-1-thread-2 ======获取资源!! pool-1-thread-1 ======获取资源!! pool-1-thread-1 ******释放资源!! pool-1-thread-6 ======获取资源!! pool-1-thread-5 ======获取资源!! pool-1-thread-2 ******释放资源!! pool-1-thread-6 ******释放资源!! pool-1-thread-4 ======获取资源!! pool-1-thread-3 ======获取资源!! pool-1-thread-5 ******释放资源!! pool-1-thread-4 ******释放资源!! pool-1-thread-3 ******释放资源!!
根据结果可以看出只有当有线程释放资源之后,才会有新的线程获取到资源。即控制了同一时间访问临界区资源的线程数量。当Semaphore(1)设置为1的时候,此时可以当做锁来使用。多线程(五) java的线程锁
线程之间的通信和协作方式大概就以上六种,要熟悉每种工具的使用场景和方法特性,通过灵活的组合各个工具来灵活控制各个线程的工作。在决定使用哪种工具之前必须要明确自己的目的是什么,要实现什么样的机制,这样才能确定选择哪种工具协调各个线程。
=========================================
原文链接:多线程(六)线程间的通信和协作转载请注明出处!
=========================================
---end