【java并发】条件阻塞Condition的应用

  Condition将Object监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用。

1. Condition的基本使用

  由于Condition可以用来替代wait、notify等方法,所以可以对比着之前写过的线程间通信的代码来看,再来看一下原来那个问题:

有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

  之前用wait和notify来实现的,现在用Condition来改写一下,代码如下:

public class ConditionCommunication {

    public static void main(String[] args) {
        Business bussiness = new Business();

        new Thread(new Runnable() {// 开启一个子线程

                    @Override
                    public void run() {
                        for (int i = 1; i <= 50; i++) {

                            bussiness.sub(i);
                        }

                    }
                }).start();

        // main方法主线程
        for (int i = 1; i <= 50; i++) {

            bussiness.main(i);
        }
    }
}

class Business {

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition(); //Condition是在具体的lock之上的

    private boolean bShouldSub = true;

    public void sub(int i) {
        lock.lock();
        try {
            while (!bShouldSub) {
                try {
                    condition.await(); //用condition来调用await方法
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = false;
            condition.signal(); //用condition来发出唤醒信号,唤醒某一个
        } finally {
            lock.unlock();
        }
    }

    public void main(int i) {
        lock.lock();
        try {
            while (bShouldSub) {
                try {
                    condition.await(); //用condition来调用await方法
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("main thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = true;
            condition.signal(); //用condition来发出唤醒信号么,唤醒某一个
        } finally {
            lock.unlock();
        }
    }
}

  从代码来看,Condition的使用时和Lock一起的,没有Lock就没法使用Condition,因为Condition是通过Lock来new出来的,这种用法很简单,只要掌握了synchronized和wait、notify的使用,完全可以掌握Lock和Condition的使用。

2. Condition的拔高

2.1 缓冲区的阻塞队列

  上面使用Lock和Condition来代替synchronized和Object监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点应用:模拟缓冲区的阻塞队列。

什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我  就需要做两件事,一件事是接收用户发过来的消息,并按顺序放到缓冲区,另一件事是从缓冲区中按顺序取出用户发过来的消息,并发送出去。

  现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组中写入数据,也可以从数组中把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。

  好了,分析完了这个缓冲区的阻塞队列,下面就用Condition技术来实现一下:

class Buffer {
    final Lock lock = new ReentrantLock(); //定义一个锁
    final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition
    final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition

    final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大

    int putptr, takeptr, count; //数组下标,用来标定位置的

    //往队列中存数据
    public void put(Object x) throws InterruptedException {
        lock.lock(); //上锁
        try {
            while (count == items.length) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");
                notFull.await();    //如果队列满了,那么阻塞存数据这个线程,等待被唤醒
            }
            //如果没满,按顺序往数组中存
            items[putptr] = x;
            if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端
                putptr = 0;
            ++count;    //消息数量
            System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
            notEmpty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
        } finally {
            lock.unlock(); //放锁
        }
    }

    //从队列中取数据
    public Object take() throws InterruptedException {
        lock.lock(); //上锁
        try {
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");
                notEmpty.await();  //如果队列是空,那么阻塞取数据这个线程,等待被唤醒
            }
            //如果没空,按顺序从数组中取
            Object x = items[takeptr];
            if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端
                takeptr = 0;
            --count; //消息数量
            System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
            notFull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
            return x;
        } finally {
            lock.unlock(); //放锁
        }
    }
}

  这个程序很经典,我从官方JDK文档中拿出来的,然后加了注释。程序中定义了两个Condition,分别针对两个线程,等待和唤醒分别用不同的Condition来执行,思路很清晰,程序也很健壮。可以考虑一个问题,为啥要用两个Codition呢?之所以这么设计肯定是有原因的,如果用一个Condition,现在假设队列满了,但是有2个线程A和B同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程A,那么A可以存了,存完后,A又唤醒一个线程,如果B被唤醒了,那就出问题了,因为此时队列是满的,B不能存的,B存的话就会覆盖原来还没被取走的值,就因为使用了一个Condition,存和取都用这个Condition来睡眠和唤醒,就乱了套。到这里,就能体会到这个Condition的用武之地了,现在来测试一下上面的阻塞队列的效果:

public class BoundedBuffer {

    public static void main(String[] args) {

        Buffer buffer = new Buffer();

        for(int i = 0; i < 5; i ++) { //开启5个线程往缓冲区存数据
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        buffer.put(new Random().nextInt(1000)); //随机存数据
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        for(int i = 0; i < 10; i ++) { //开启10个线程从缓冲区中取数据
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        buffer.take(); //从缓冲区取数据
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

  我故意只开启5个线程存数据,10个线程取数据,就是想让它出现取数据被阻塞的情况发生,看运行的结果:

Thread-5 被阻塞了,暂时无法取数据!

Thread-10 被阻塞了,暂时无法取数据!

Thread-1 存好了值: 755

Thread-0 存好了值: 206

Thread-2 存好了值: 741

Thread-3 存好了值: 381

Thread-14 取出了值: 755

Thread-4 存好了值: 783

Thread-6 取出了值: 206

Thread-7 取出了值: 741

Thread-8 取出了值: 381

Thread-9 取出了值: 783

Thread-5 被阻塞了,暂时无法取数据!

Thread-11 被阻塞了,暂时无法取数据!

Thread-12 被阻塞了,暂时无法取数据!

Thread-10 被阻塞了,暂时无法取数据!

Thread-13 被阻塞了,暂时无法取数据!

  从结果中可以看出,线程5和10抢先执行,发现队列中没有,于是就被阻塞了,睡在那了,直到队列中有新的值存入才可以取,但是它们两运气不好,存的数据又被其他线程给抢先取走了,哈哈……可以多运行几次。如果想要看到存数据被阻塞,可以将取数据的线程设置少一点,这里我就不设了。

2.2 两个以上线程之间的唤醒

  还是原来那个题目,现在让三个线程来执行,看一下题目:

有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。

  如过不用Condition,还真不好弄,但是用Condition来做的话,就非常方便了,原理很简单,定义三个Condition,子线程1执行完唤醒子线程2,子线程2执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面看看代码吧,很容易理解。

public class ThreeConditionCommunication {

    public static void main(String[] args) {
        Business bussiness = new Business();

        new Thread(new Runnable() {// 开启一个子线程

                    @Override
                    public void run() {
                        for (int i = 1; i <= 50; i++) {

                            bussiness.sub1(i);
                        }

                    }
                }).start();

        new Thread(new Runnable() {// 开启另一个子线程

            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {

                    bussiness.sub2(i);
                }

            }
        }).start();

        // main方法主线程
        for (int i = 1; i <= 50; i++) {

            bussiness.main(i);
        }
    }

    static class Business {

        Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition(); //Condition是在具体的lock之上的
        Condition condition2 = lock.newCondition();
        Condition conditionMain = lock.newCondition();

        private int bShouldSub = 0;

        public void sub1(int i) {
            lock.lock();
            try {
                while (bShouldSub != 0) {
                    try {
                        condition1.await(); //用condition来调用await方法
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub1 thread sequence of " + j
                            + ", loop of " + i);
                }
                bShouldSub = 1;
                condition2.signal(); //让线程2执行
            } finally {
                lock.unlock();
            }
        }

        public void sub2(int i) {
            lock.lock();
            try {
                while (bShouldSub != 1) {
                    try {
                        condition2.await(); //用condition来调用await方法
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub2 thread sequence of " + j
                            + ", loop of " + i);
                }
                bShouldSub = 2;
                conditionMain.signal(); //让主线程执行
            } finally {
                lock.unlock();
            }
        }

        public void main(int i) {
            lock.lock();
            try {
                while (bShouldSub != 2) {
                    try {
                        conditionMain.await(); //用condition来调用await方法
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 5; j++) {
                    System.out.println("main thread sequence of " + j
                            + ", loop of " + i);
                }
                bShouldSub = 0;
                condition1.signal(); //让线程1执行
            } finally {
                lock.unlock();
            }
        }
    }
}

  代码看似有点长,但是是假象,逻辑非常简单。关于线程中的Condition技术就总结这么多吧。

  

  相关阅读:http://blog.csdn.net/column/details/bingfa.html



—–乐于分享,共同进步!

—–更多文章请看:http://blog.csdn.net/eson_15

时间: 2024-10-19 23:36:11

【java并发】条件阻塞Condition的应用的相关文章

13.Java5条件阻塞Condition的应用

1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 5 /** 6 * Java5条件阻塞Condition的应用 7 * Condition的功能类似在传统线程技术中的Object.wait()和Object.notify()功能. 8 * 在等待Condition

【java并发】阻塞队列的使用

在前面一篇名为条件阻塞Condition的应用的博客中提到了一个拔高的例子:利用Condition来实现阻塞队列.其实在java中,有个叫ArrayBlockingQueue<E>的类提供了阻塞队列的功能,所以我们如果需要使用阻塞队列,完全没有必要自己去写. ArrayBlockingQueue<E>实现了BlockingQueue<E>,另外还有LinkedBlockingQueue<E>和PriorityBlockingQueue<E>.Ar

Java并发编程之Condition

1.使用synchronized中的等待和唤醒实现消费者和生产者模式 /** * 使用Synchronized实现消费者生产者模式 */ public class SynchronizedDemo { static List<Integer> list = new ArrayList<Integer>(); private static int maxNum = 5; // 消费者 private void Consumer(String name){ synchronized (

条件阻塞Condition的应用

Condition的功能类似在传统线程技术中的Object.wait和Object.notity的功能. 例子:生产者与消费者 import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.lock

_java5条件阻塞Condition的应用

wait 和notify 在 synchronized 包含的范围中 必须与synchronized 使用同一个对象 可以在一个锁中设定 多个 condition 来执行等待的方法,这样的好处就是能实现多个 方法之间的等待执行.例如下面例子 的两个condition class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull  = lock.newCondition(); //使用con

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java多线程与并发应用-(9)-锁lock+条件阻塞conditon实现线程同步通信

一. lock可以代替synchronized关键字实现互斥功能.使用方法如下: Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); } 需要注意的是. 1.需要互斥的一个或多个方法要使用同一个互斥锁. 2.在被锁包含的代码块中,要使用finally块将锁释放. 二. Condition的await方法(注意不是wait方法)可以替换传统通信中的wa

12、Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

转: 【Java并发编程】之二十:并发新特性—Lock锁和条件变量(含代码)

简单使用Lock锁 Java5中引入了新的锁机制--Java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例化