Java并发:线程间同步-条件队列和同步工具类

转载请注明出处: jiq?钦‘s technical Blog - 季义钦

线程之间的同步,除了互斥(前面介绍的互斥锁)之外,还存在协作关系,下面我们就介绍一下java线程间常见的一些协作方式。

一、内置条件队列

正如每个Java对象都可以作为一个内置锁,每个对象也可以作为一个条件队列,称为内置条件队列,Object.wait()、notify()/notifyAll()构成了内置条件队列的API。

需要注意的是,调用任何对象X的内置条件队列的API都必须要先获得该对象X的内置锁。

1、API介绍

Wait()

u  调用时自动释放当前锁,请求OS将自己挂起

u  内置条件队列上的条件发生后被唤醒

u  被唤醒后与其他线程竞争重新获得锁

notify()

u  通知唤醒当前获得锁的对象的内置条件队列上的任意一个等待的线程

u  发出通知后尽快释放当前获得的锁确保等待的线程能够获取

notifyAll()

u  通知唤醒当前获得锁的对象的内置条件队列上的所有等待的线程

u  发出通知后尽快释放当前获得的锁确保等待的线程能够获取

u  只有一个被唤醒的线程能够获得锁,先竞争到锁的线程执行完退出Synchronized块之后其他被唤醒的线程重新竞争直到所有被唤醒的线程执行完毕

2、使用环境

必须运行在同步控制块中: wait,notify,notifyAll作为常用的任务间协作原语,是Object类的一部分,而不是Thread的一部分,所以可以把他们放进任何同步控制方法中。

实际上,只能在同步控制方法/同步控制块中调用wait,notify,notifyAll这几个方法,如果在非同步控制方法里面调用了这几个方法,可以编译通过,但是运行的时候会获得IllegalMonitorStateException异常。

一般来说,wait和notify放在synchronozed(object)同步块中,并由这个object来调用。如果是synchronized(this),那么就直接调用。具体如下:

(1)在某个指定对象lockObj上调用:

synchronized(lockObj)   //获取lockObj对象锁
       {
           try {
              //释放lockObj对象锁,阻塞等待在内置条件队列上
              lockObj.wait();
           }catch(InterruptedException e) {
              e.printStackTrace();
              return;
           }
       }

       synchronized(lockObj)
       {
           //唤醒一个等待在lockObj对象的内置条件队列上的线程
           lockObj.notify();
       }

 

(2)也可以在this上调用:

synchronized(this)//获取当前对象内置锁
       {
           try {
              //释放当前对象锁,阻塞在内置条件队列上
              wait();
           }catch(InterruptedException e) {
              e.printStackTrace();
              return;
           }
       }

       synchronized(this)
       {
           //唤醒当前对象内置条件队列上的一个线程
           notify();
       }

3、通知遗漏问题

wait常与while(条件判断) 配合使用:一般来说,必须用一个检查感兴趣的条件的while循环来包围wait,因为如果有多个任务等待同一个锁,第一个唤醒的任务可能先执行改变while条件判断中的状态,使得当前任务不得不再次被挂起,直到感兴趣的条件发生变化为止。

synchronized(this){ while(waxon == true)wait(); }

这样可以避免“notify通知遗漏问题”。

//线程A
synchronized (proceedLock) {
           proceedLock.wait();
       }

//线程B
synchronized (proceedLock) {
           proceedLock.notifyAll();
       }

本来设计线程B的职责就是再某个时刻通知线程A将其唤醒,但是如果线程B执行太早,在线程A还没开始动的时候就已经执行完成,那么线程A就会一直wait下去,等不到线程B来将其唤醒。这就是所谓的通知遗漏问题。

如果线程A在wait的时候配合变量判断就可以解决这个问题。

//线程A:
synchronized (proceedLock) {
           //while循环判断,这里不用if的原因是为了防止早期通知
           while ( okToProceed == false ) {
              proceedLock.wait();
           }
       }

//线程B:
synchronized (proceedLock) {
           //通知之前,将其设置为true,这样即使出现通知遗漏的情况
           //也不会使线程在wait出阻塞
           okToProceed= true;
           proceedLock.notifyAll();
       } 

变量okToProceed在初始时设置为false,即让线程A默认阻塞,等待线程B将其唤醒。如果线程B仍然在线程A还未动之前就已经结束了,但是已经将线程B等待的条件设置为true了,所以线程A是不会wait休眠的。

这样就避免了通知遗漏问题。

二、显示条件队列

前面已经说了每个Java对象都有一个内置的条件队列,但是它又一个很明显的缺陷:每个内置锁只能有一个关联的内置条件队列!!!

可以在显式锁ReentrantLock上调用Lock.newCondition()方法获得一个显示的Condition条件队列,Condition比内置条件队列提供了更加丰富的功能:在每个锁上可以创建多个显示条件队列,条件等待可以选择可中断或者不可中断,等待也可以设置时限,此外还提供公平的和非公平的队列操作。

在显示条件队列Condition中,与内置条件队列的wait、notify、notifyAll相对应的方法分别是await、signal、signalAll。

下面用一个例子说明:例子给出了有界缓存的实现,在同一个显式锁上创建了两个显示条件队列,一个表明缓存不满的条件,一个表明缓存不空的条件。

public classConditionBoundBuffer<T> {
    protected final Lock lock = new ReentrantLock();

    //缓存非满的条件队列
    private final Condition notFullCond = lock.newCondition();

    //缓存非空的条件队列
    private final Condition notEmptyCond = lock.newCondition();

    @SuppressWarnings("unchecked")
    private final T[] items = (T[])new Object[100];
    private int tail,head,count;

    public void put(T x) throws Exception
    {
       lock.lock();
       try
       {
           //当缓存满的时候,阻塞等待在缓存非满的条件队列上,并释放锁
           while(count == items.length)
              notFullCond.await();

           items[tail] = x;
           if(++tail == items.length)
              tail = 0;
           ++count;

           //唤醒等待在缓存非空条件队列上的一个线程,并释放锁
           notEmptyCond.signal();
       }finally
       {
           lock.unlock();
       }
    }

    public T take() throws InterruptedException
    {
       lock.lock();
       try
       {
           //当缓存为空时,阻塞等待在缓存非空的条件队列上,并释放锁
           while(count == 0)
              notEmptyCond.await();

           Tx = items[head];
           items[head] = null;
           if(++head == items.length)
              head = 0;
           --count;

           //唤醒等待在缓存非满条件队列上的一个线程,并释放锁
           notFullCond.signal();
           return x;
       }finally
       {
           lock.unlock();
       }
    }
}

三、同步工具类

java.util.concurrent包中含有一些同步工具类,提供一些实用的线程间同步功能。

3.1 BlockingQueue(阻塞队列)

可阻塞队列BlockingQueue拓展了Queue,增加了可阻塞的插入和获取等操作

public interface BlockingQueue<E>extends Queue<E> {

//放入元素,若有空间容纳则返回true,否则抛出IllegalStateException异常

boolean add(E e);

//放入元素,若有空间容纳则返回true,否则返回false

boolean offer(E e);

//放入元素,若有空间容纳则返回true,否则阻塞等待

void put(E e) throws InterruptedException;

//检索并移除队首元素,若不能立刻取到则阻塞等待

E take() throws InterruptedException;

//检索并移除队首元素,若不能立刻取到则等待,超时后返回null

Epoll(long timeout, TimeUnit unit) throws InterruptedException;

}

 

原理和应用:BlockingQueue是线程安全容器,并且具备阻塞特性,其内部通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒。通过put和take方法,很容易实现线程间协同,比如典型的生产者-消费者模式。

下面是几个BlockingQueue接口的实现类:

(1)ArrayBlockingQueue:基于数组的阻塞队列实现,大小固定,其构造函数必须指定int参数来指明队列大小,内部元素以FIFO(先进先出)顺序存储,常用于实现有界缓存。

(2)LinkedBlockingQueue:基于链表的阻塞队列实现,大小不固定,若其构造函数带一个规定大小的参数,则生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定,内部元素以FIFO(先入先出)顺序存储。

(3)PriorityBlockingQueue:基于数组的阻塞队列实现,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。

(4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

3.2 CountDownLatch(闭锁)

让相关线程在某一个点上等待,直到某一条件发生时,这些等待的线程才会继续执行,即所有线程阻塞等待闭锁的计数值减为0

打个比喻,闭锁相当于一扇门,这扇门要按N次(N是闭锁的初始计数值)才能打开,但是按门的线程不关心有多少线程在门外等待,只有门打开了,等待在门外的所有线程才能进去。

步骤1:初始化闭锁(设定门要按几次才能开)

CountDownLatch latch = new CountDownLatch(N);

步骤2:让线程等待该闭锁(在门外等待)

latch.await();

当等待的线程检测到当前闭锁计数器已经减为0(门打开),则继续执行。

步骤3:闭锁计数器减1(按1次门)

latch.countDown();

应用:一个线程等待N个线程全部完成任务

比如主线程需要所有图片资源都准备好之后才能使用,所以开启N个线程为其下载图片资源,自己则初始化初始值为N的闭锁并调用await()等待在这个闭锁上,每个线程下载完图片资源之后调用countDown()将闭锁减一,最后一个下载线程减一之后闭锁计数器变为0,此时等待闭锁的主线程才开始继续执行,使用已下载的图片资源。

类似地可以实现N个线程等1个线程开门,1个线程等待1个线程开门等。

3.3 Semaphore(信号量)

信号量用来控制同步访问某个特定资源的线程的数量。

信号量的数目就代表资源数目,当申请一个信号量之后,表示资源数目减1,如果某个线程要申请信号量,但是该信号量数目已经为0了,改线程将会阻塞等待信号量的释放。

步骤1:初始化信号量

Semaphore sem = new Semaphore(N);  //N代表资源数目

步骤2:申请占用一个信号量

sem.acquire();   //信号量数值减1,如果信号量计数值已经为0,将阻塞等待

步骤3:释放一个信号量

sem.release();  //信号量数值加1,标识资源使用完成,阻塞等待的线程被唤醒

应用:数据库连接池管理

将可用的和被占用的数据库连接分别管理在两个集合中,获取数据库连接的函数会从可用连接集合获取一个连接,并将连接转移到另一个集合,释放数据库连接的函数将会把用完的连接放入可用连接集合。

我们不想在没有数据库连接可用时获取连接的函数直接返回失败,而是想阻塞等待。所以在获取连接的函数中加入申请信号量的调用,在释放数据库连接的函数中加入释放信号量的调用就可以了(注意数据库连接池管理更好的方式可能是BlockingQueue,因为信号量初始值的数目是固定的,在这里需要和数据库连接池大小相同)。

0-1信号量:又称为互斥信号量,有且仅有一个线程能够获取资源的独占使用,或者函数的独占访问。

3.4 CyclicBarrier(栅栏)

多个线程单独执行,当所有线程都达到栅栏位置之后,才调度指定任务执行。

栅栏和闭锁很像,区别在于:闭锁是等待事件(闭锁计数值变为0)发生,而栅栏是等待其他所有线程均达到栅栏位置。

步骤1:初始化栅栏

CyclicBarrier Barrier = newCyclicBarrier(count, runnableTask);

指定需要有count个线程到达栅栏点之后才能冲破栅栏,并调用runnableTask任务执行。

步骤2:线程中设置栅栏点

barrier.wait();

当设置了栅栏的所有线程都达到了这个栅栏位置之后,才调用runnableTask任务执行。

注意:从CyclicBarrier的名称中可以看出,栅栏具备可循环特性,即所有线程冲破栅栏之后,如果该线程会循环继续执行,那么下次改栅栏仍然有效。

时间: 2024-10-13 10:35:26

Java并发:线程间同步-条件队列和同步工具类的相关文章

Java并发——线程间通信与同步技术

传统的线程间通信与同步技术为Object上的wait().notify().notifyAll()等方法,Java在显示锁上增加了Condition对象,该对象也可以实现线程间通信与同步.本文会介绍有界缓存的概念与实现,在一步步实现有界缓存的过程中引入线程间通信与同步技术的必要性.首先先介绍一个有界缓存的抽象基类,所有具体实现都将继承自这个抽象基类: public abstract class BaseBoundedBuffer<V> { private final V[] buf; priv

Java并发——线程间的等待与通知

前言: 前面讲完了一些并发编程的原理,现在我们要来学习的是线程之间的协作.通俗来说就是,当前线程在某个条件下需要等待,不需要使用太多系统资源.在某个条件下我们需要去唤醒它,分配给它一定的系统资源,让它继续工作.这样能更好的节约资源. 一.Object的wait()与notify() 基本概念: 一个线程因执行目标动作的条件未能满足而被要求暂停就是wait,而一个线程满足执行目标动作的条件之后唤醒被暂停的线程就是notify. 基本模板: synchronized (obj){ //保护条件不成立

java并发-使用内置条件队列实现简单的有界缓存

内置锁和内置条件队列一起,一个简单的应用是创建可阻塞的有界缓存区,java并发包的BlockingQueue就是一个利用Lock和显式条件队列实现的可阻塞的有界队列.总结内置锁和内置条件的原理,这里我们用另一种方式实现简单的可阻塞缓存.源码如下: 首先,创建一抽象有界缓存类ABoundedBuffer,提供插入和删除的基本实现. /** * @title :ABoundedBuffer * @description :有界缓存抽象类 * @update :2014-12-30 上午9:29:33

[C++11 并发编程] 12 使用条件变量创建线程间安全的队列

之前有一节中,我们使用mutex实现了一个线程间安全的堆栈.这一节,我们使用条件变量来实现一个线程间安全的队列. 标准库中的std::queue<>的接口定义如下: template <class T, class Container = std::deque<T> > class queue { public: explicit queue(const Container&); explicit queue(Container&& = Cont

Java 并发 线程同步

Java 并发 线程同步 @author ixenos 同步 1.异步线程本身包含了执行时需要的数据和方法,不需要外部提供的资源和方法,在执行时也不关心与其并发执行的其他线程的状态和行为 2.然而,大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取,这将产生同步问题(可见性和同步性的丢失) 比如两个线程同时执行指令account[to] += amount,这不是原子操作,可能被处理如下: a)将account[to]加载到寄存器 b)增加amount c)将结果写回acco

Java 并发 线程的生命周期

Java 并发 线程的生命周期 @author ixenos 线程的生命周期 线程状态: a)     New 新建 b)     Runnable 可运行 c)     Running 运行 (调用getState()时显示为Runnable) d)     Blocked 阻塞 i.          I/O阻塞 (不释放锁) I/O操作完成解除阻塞,进入Runnable状态 ii.          同步阻塞(不释放锁) 运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会

有多少人在面试时,被Java 如何线程间通讯,问哭了?

正常情况下,每个子线程完成各自的任务就可以结束了.不过有的时候,我们希望多个线程协同工作来完成某个任务,这时就涉及到了线程间通信了. 本文涉及到的知识点: thread.join(), object.wait(), object.notify(), CountdownLatch, CyclicBarrier, FutureTask, Callable 下面我从几个例子作为切入点来讲解下 Java 里有哪些方法来实现线程间通信. 如何让两个线程依次执行? 那如何让 两个线程按照指定方式有序交叉运行

Java 并发 线程属性

Java 并发 线程属性 @author ixenos 线程优先级 1.每当线程调度器有机会选择新线程时,首先选择具有较高优先级的线程 2.默认情况下,一个线程继承它的父线程的优先级 当在一个运行的线程A里,创建另一个线程B的时候,那么A是父线程,B是子线程.当在一个运行的线程A里,创建线程B,然后又创建了线程C,这时候虽然B比C创建早,可是B并不是C的父线程,而A是B和C的父线程. 3.线程的优先级高度依赖于系统,当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台

Java 并发 线程的优先级

Java 并发 线程的优先级 @author ixenos 低优先级线程的执行时刻 1.在任意时刻,当有多个线程处于可运行状态时,运行系统总是挑选一个优先级最高的线程执行,只有当线程停止.退出或者由于某些原因不执行的时候,低优先级的线程才可能被执行 2.两个优先级相同的线程同时等待执行时,那么运行系统会以round-robin的方式选择一个线程执行(即轮询调度,以该算法所定的)(Java的优先级策略是抢占式调度!) 3.被选中的线程可因为一下原因退出,而给其他线程执行的机会: 1) 一个更高优先