关键字synchronized与wait和notify/notifyAll方法相结合可以实现等待/通知模式,类ReentrantLock也可以实现同样的功能,但需要借助于Condition对象。Condition类是JDK5中出现的技术,使用它有更好的灵活性,比如可以实现多路通知功能,也就是在一个Lock对象里面可以创建多个Condition(即对象监视器)实例,线程对象可以注册在指定的Condition中,从而可以有选择性地进行线程通知,在调度线程上更加灵活。
在使用notify/notifyAll方法进行通知时,被通知的线程却是由JVM随机选择的。但使用ReentrantLock结合Condition类是可以实现前面介绍过的“选择性通知”,这个功能是非常重要的,而且在Condition类中是默认提供的。
而synchronized就相当于整个Lock对象中只有一个单一的Condition对象,所有的线程都注册到它一个对象的身上。线程开始notifyAll时,需要通知所有的WAITING线程,没有选择权,会出现相当大的效率问题。
package org.github.lujiango; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Test06 { static class Service { private Lock lock = new ReentrantLock(); public Condition condition = lock.newCondition(); public void await() { try { lock.lock(); System.out.println("await time: " + System.currentTimeMillis()); condition.await(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signal() { try { lock.lock(); System.out.println("signal time: " + System.currentTimeMillis()); condition.signal(); } finally { lock.unlock(); } } } static class ThreadA extends Thread { private Service service; public ThreadA(Service service) { this.service = service; } @Override public void run() { service.await(); } } public static void main(String[] args) throws InterruptedException { Service service = new Service(); ThreadA a = new ThreadA(service); a.start(); Thread.sleep(3000); service.signal(); } }
如果想单独唤醒部分线程该怎么处理呢?这时就有必要使用多个Condition对象了,也就是Condition对象可以唤醒部分指定线程,有助于提升程序运行的效率。可以先对线程进行分组,然后在唤醒指定组中的线程。
package org.github.lujiango; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Test07 { static class MyService { private Lock lock = new ReentrantLock(); public Condition condition1 = lock.newCondition(); public Condition condition2 = lock.newCondition(); public void await1() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " begin await1"); condition1.await(); System.out.println(Thread.currentThread().getName() + " end await1"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void await2() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " begin await2"); condition2.await(); System.out.println(Thread.currentThread().getName() + " end await2"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signalAll1() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " signalAll1"); condition1.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signalAll2() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " signalAll2"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } static class ThreadA extends Thread { private MyService service; public ThreadA(MyService service) { this.service = service; } @Override public void run() { service.await1(); } } static class ThreadB extends Thread { private MyService service; public ThreadB(MyService service) { this.service = service; } @Override public void run() { service.await2(); } } public static void main(String[] args) throws InterruptedException { MyService service = new MyService(); ThreadA a = new ThreadA(service); a.setName("A"); a.start(); ThreadB b = new ThreadB(service); b.setName("B"); b.start(); Thread.sleep(3000); service.signalAll2(); } }
实现生产者/消费者模式:一对一交替打印
package org.github.lujiango; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Test08 { static class Service { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private Boolean hasValue = Boolean.FALSE; public void producer() { try { lock.lock(); while (hasValue) { condition.await(); } System.out.println("producer"); hasValue = Boolean.TRUE; condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void consumer() { try { lock.lock(); while (!hasValue) { condition.await(); } System.out.println("consumer"); hasValue = Boolean.FALSE; condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } static class ThreadA extends Thread { private Service service; public ThreadA(Service service) { this.service = service; } @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { service.producer(); } } } static class ThreadB extends Thread { private Service service; public ThreadB(Service service) { this.service = service; } @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { service.consumer(); } } } public static void main(String[] args) { Service service = new Service(); ThreadA a = new ThreadA(service); a.start(); ThreadB b = new ThreadB(service); b.start(); } }
时间: 2024-11-05 12:28:53