Java - "JUC"之Condition源码解析

Java多线程系列--“JUC锁”06之 Condition条件

概要

前面对JUC包中的锁的原理进行了介绍,本章会JUC中对与锁经常配合使用的Condition进行介绍,内容包括:
Condition介绍
Condition函数列表
Condition示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496716.html

Condition介绍

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

Condition函数列表

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

Condition示例

示例1是通过Object的wait(), notify()来演示线程的休眠/唤醒功能。
示例2是通过Condition的await(), signal()来演示线程的休眠/唤醒功能。
示例3是通过Condition的高级功能。

示例1

public class WaitTest1 {

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start();

                System.out.println(Thread.currentThread().getName()+" block");
                ta.wait();    // 等待

                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
                System.out.println(Thread.currentThread().getName()+" wakup others");
                notify();    // 唤醒“当前对象上的等待线程”
            }
        }
    }
}

示例2

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest1 {

    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        lock.lock(); // 获取锁
        try {
            System.out.println(Thread.currentThread().getName()+" start ta");
            ta.start();

            System.out.println(Thread.currentThread().getName()+" block");
            condition.await();    // 等待

            System.out.println(Thread.currentThread().getName()+" continue");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            lock.lock();    // 获取锁
            try {
                System.out.println(Thread.currentThread().getName()+" wakup others");
                condition.signal();    // 唤醒“condition所在锁上的其它线程”
            } finally {
                lock.unlock();    // 释放锁
            }
        }
    }
}

运行结果

main start ta
main block
ta wakup others
main continue

通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系:

              Object      Condition 
休眠          wait        await
唤醒个线程     notify      signal
唤醒所有线程   notifyAll   signalAll

Condition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。         如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。  但是,通过Condition,就能明确的指定唤醒读线程。
看看下面的示例3,可能对这个概念有更深刻的理解。

示例3

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
            while (count == items.length)
                notFull.await();
            // 将x添加到缓冲中
            items[putptr] = x;
            // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            // 将“缓冲”数量+1
            ++count;
            // 唤醒take线程,因为take线程通过notEmpty.await()等待
            notEmpty.signal();

            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
            while (count == 0)
                notEmpty.await();
            // 将x从缓冲中取出
            Object x = items[takeptr];
            // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
            if (++takeptr == items.length) takeptr = 0;
            // 将“缓冲”数量-1
            --count;
            // 唤醒put线程,因为put线程通过notFull.await()等待
            notFull.signal();

            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();    // 释放锁
        }
    }
}

public class ConditionTest2 {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
        // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 线程休眠1ms
                bb.put(num);        // 向BoundedBuffer中写入数据
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 线程休眠1ms
                Integer num = (Integer)bb.take();    // 从BoundedBuffer中取出数据
            } catch (InterruptedException e) {
            }
        }
    }
}

(某一次)运行结果

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

结果说明
(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作“一个BoundedBuffer对象”时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用“notFull”和“notEmpty”这两个Condition。
       notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
       同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。
(03) 简单分析一下运行结果。

     1, p1线程向缓冲中写入1。    此时,缓冲区数据:   | 1 |   |   |   |   |
     2, p4线程向缓冲中写入4。    此时,缓冲区数据:   | 1 | 4 |   |   |   |
     3, p5线程向缓冲中写入5。    此时,缓冲区数据:   | 1 | 4 | 5 |   |   |
     4, p0线程向缓冲中写入0。    此时,缓冲区数据:   | 1 | 4 | 5 | 0 |   |
     5, p2线程向缓冲中写入2。    此时,缓冲区数据:   | 1 | 4 | 5 | 0 | 2 |
     此时,缓冲区容量为5;缓冲区已满!如果此时,还有“写线程”想往缓冲中写入数据,会调用put中的notFull.await()等待,直接缓冲区非满状态,才能继续运行。
     6, t0线程从缓冲中取出数据1。此时,缓冲区数据:   |   | 4 | 5 | 0 | 2 |
     7, p3线程向缓冲中写入3。    此时,缓冲区数据:   | 3 | 4 | 5 | 0 | 2 |
     8, t1线程从缓冲中取出数据4。此时,缓冲区数据:   | 3 |   | 5 | 0 | 2 |
     9, p6线程向缓冲中写入6。    此时,缓冲区数据:   | 3 | 6 | 5 | 0 | 2 |
     ...

时间: 2024-07-30 00:07:39

Java - "JUC"之Condition源码解析的相关文章

Java集合---Array类源码解析

Java集合---Array类源码解析              ---转自:牛奶.不加糖 一.Arrays.sort()数组排序 Java Arrays中提供了对所有类型的排序.其中主要分为Primitive(8种基本类型)和Object两大类. 基本类型:采用调优的快速排序: 对象类型:采用改进的归并排序. 1.对于基本类型源码分析如下(以int[]为例): Java对Primitive(int,float等原型数据)数组采用快速排序,对Object对象数组采用归并排序.对这一区别,sun在

java.lang.Void类源码解析_java - JAVA

文章来源:嗨学网 敏而好学论坛www.piaodoo.com 欢迎大家相互学习 在一次源码查看ThreadGroup的时候,看到一段代码,为以下: /* * @throws NullPointerException if the parent argument is {@code null} * @throws SecurityException if the current thread cannot create a * thread in the specified thread grou

Java集合类库 ArrayList 源码解析

集合类库是Java的一个重大突破,方便了我们对大数据的操作.其中 Arrays 和 Collections 工具类可以帮助我们快速操作集合类库.下面对Java集合类库的源码分析是基于jdk1.7的.今天我们来看看ArrayList的底层实现原理. ArrayList的继承结构图 继承自 AbstractList 抽象类,在上层是 AbstractCollection 抽象类,直接去 AbstractCollection 类去看看. AbstractCollection 类主要实现了 Collec

【Java集合】-- LinkedList源码解析

目录 继承体系 数据结构 源码解析 1.属性 2.构造方法 LinkedList() LinkedList(Collection<? extends E> c) 3.添加元素 add(E e) addFirst(E e) addLast(E e) add(int index, E element) offer(E e) offerFirst(E e) offerLast(E e) 总结 4.获取元素 get(int index) getFirst() getLast() peek() 5.删除

Java集合类库 LinkedList 源码解析

基于JDK 1.7,和ArrayList进行比较分析 Java已经有了ArrayList,用来存放元素,对元素的操作都很方便.为什么还会有LinkedList呢?我们都知道ArrayList获取元素很快,但是插入一个元素很慢,因为ArrayList底层维护的是一个数组,往数组中的某个位置插入一个元素,是很消耗资源的. 而LinkedList插入元素很快,获取任意位置的元素却很慢.这是为什么呢?底层又是怎样实现的呢? 1.继承关系 LinkedList的继承关系图: LinkedList继承的是A

Java集合---Arrays类源码解析

一.Arrays.sort()数组排序 Java Arrays中提供了对所有类型的排序.其中主要分为Primitive(8种基本类型)和Object两大类. 基本类型:采用调优的快速排序: 对象类型:采用改进的归并排序. 1.对于基本类型源码分析如下(以int[]为例): Java对Primitive(int,float等原型数据)数组采用快速排序,对Object对象数组采用归并排序.对这一区别,sun在<<The Java Tutorial>>中做出的解释如下: The sort

Java 1.8 ArrayList源码解析

1 // 非线程安全 2 // 继承了AbstractList类 3 // 实现了List.RandomAccess.Cloneable.java.io.Serializable接口 4 // 后面3个接口是标记接口,没有抽象方法. 5 // 表示ArrayList可以随机访问.浅复制.序列化和反序列化. 6 public class ArrayList<E> extends AbstractList<E> 7 implements List<E>, RandomAcc

java并发-AQS.ObjectCondition源码解析

1 什么是条件队列 它使得一组线程能够通过某种方式来等待特定的条件变成真,条件队列的元素是一个个正在等待状态的线程.对象的内置锁(synchronized语义对应的同步机制),关联着一个内置的条件队列.Object的wait/notify/notifyAll等方法构成了内部条件队列的API(即将内部锁与内部条件队列关联的机制). 内部条件队列是需要内置锁保护的,即:需要调用对象X中的条件队列,必须持有对象X上的锁.这是因为状态处于并发环境下,"等待依赖状态的某个条件"与"维护

java.lang.Boolean 类源码解析

Boolean源码比较简单. 1 public final class Boolean implements java.io.Serializable, 2 Comparable<Boolean> 3 { 4 /** 5 * The {@code Boolean} object corresponding to the primitive 6 * value {@code true}. 7 */ 8 public static final Boolean TRUE = new Boolean(