与AQS有关的并发类

ReetrantLock与Condition:

参考

在java.util.concurrent包中,有两个很特殊的工具类,Condition和ReentrantLock,使用过的人都知道,ReentrantLock(重入锁)是jdk的concurrent包提供的一种独占锁的实现。它继承自Dong Lea的 AbstractQueuedSynchronizer(同步器),确切的说是ReentrantLock的一个内部类继承了AbstractQueuedSynchronizer,ReentrantLock只不过是代理了该类的一些方法,可能有人会问为什么要使用内部类在包装一层?
我想是安全的关系,因为AbstractQueuedSynchronizer中有很多方法,还实现了共享锁,Condition(稍候再细说)等功能,如果直接使ReentrantLock继承它,则很容易出现AbstractQueuedSynchronizer中的API被误用的情况。

ReentrantLock和Condition的使用方式通常是这样的:

运行后,结果如下:

可以看到,

Condition的执行方式,是当在线程1中调用await方法后,线程1将释放锁,并且将自己沉睡,等待唤醒,

线程2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程1,线程1恢复执行。

以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

那,它是怎么实现的呢?

首先还是要明白,reentrantLock.newCondition() 返回的是Condition的一个实现,该类在AbstractQueuedSynchronizer中被实现,叫做newCondition()

关键的就在于此,我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。

而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。

2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。

3. 接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。

4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。

5.  线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。  注意,这个时候,线程1 并没有被唤醒。

6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。

7. 直到释放所整个过程执行完毕。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

CyclicBarrier: 循环的篱笆。参考

它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时
CyclicBarrier 很有用。

当某一个线程到达公共屏障点后(即完成一部分任务后),调用awaite(),等待其他线程到来。一起走。

可以看成是一个集结线程类,构造函数传入等待线程的数量

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {
 public static void main(String [] args){
  ExecutorService service=Executors.newCachedThreadPool();
  final CyclicBarrier cb=new CyclicBarrier(3);  //三个线程同时到达
  for(int i=0;i<3;i++){
   Runnable runnable=new Runnable(){
    public void run(){
     try {
      Thread.sleep((long)(Math.random()*10000));
      System.out.println("线程"+Thread.currentThread().getName()+
        "即将到达集合地点1,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+
        (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
      try {
       cb.await();
      } catch (BrokenBarrierException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
      }
      Thread.sleep((long)(Math.random()*10000));
      System.out.println("线程"+Thread.currentThread().getName()+
        "即将到达集合地点2,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+
        (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
      try {
       cb.await();
      } catch (BrokenBarrierException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
      }
      Thread.sleep((long)(Math.random()*10000));
      System.out.println("线程"+Thread.currentThread().getName()+
        "即将到达集合地点3,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+
        (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
      try {
       cb.await();
      } catch (BrokenBarrierException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
      }
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   };
   service.execute(runnable);
  }
  service.shutdown();
 }
}

结果:

线程pool-1-thread-3即将到达集合地点1,当前已有1个已到达正在等候

线程pool-1-thread-2即将到达集合地点1,当前已有2个已到达正在等候

线程pool-1-thread-1即将到达集合地点1,当前已有3个已到达都到齐了,继续走啊

线程pool-1-thread-1即将到达集合地点2,当前已有1个已到达正在等候

线程pool-1-thread-2即将到达集合地点2,当前已有2个已到达正在等候

线程pool-1-thread-3即将到达集合地点2,当前已有3个已到达都到齐了,继续走啊

线程pool-1-thread-2即将到达集合地点3,当前已有1个已到达正在等候

线程pool-1-thread-1即将到达集合地点3,当前已有2个已到达正在等候

线程pool-1-thread-3即将到达集合地点3,当前已有3个已到达都到齐了,继续走啊

CountDownLatch: 它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。参考

例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

过程:1,主线程开启

2,new出等待N个线程的CountDownLatch(构造函数传入需要等待的线程数量)

3,create并执行N个线程

4,主线程等待

5,N个线程都执行完毕

6,主线程恢复执行。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch对象的await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调
用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务(一旦其他线程已经开始运行,就可以调用CountDownLatch对象的awaite方法,等待其他线程执行完毕后,开启主线程)。

Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semphore:信号量  参考

  Semaphore类位于java.util.concurrent包下,它提供了2个构造器:


1

2

3

4

5

6

public Semaphore(int permits)
{          
//参数permits表示许可数目,即同时可以允许多少线程进行访问

    sync
new NonfairSync(permits);

}

public Semaphore(int permits, boolean fair)
{    
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可

    sync
= (fair)? 
new FairSync(permits)
new NonfairSync(permits);

}

  下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:


1

2

3

4

public void acquire() throws InterruptedException
{  }     
//获取一个许可

public void acquire(int permits) throws InterruptedException
{ }    
//获取permits个许可

public void release()
{ }          
//释放一个许可

public void release(int permits)
{ }    
//释放permits个许可

  acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

  release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

  这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:


1

2

3

4

public boolean tryAcquire()
{ };    
//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire(long timeout,
TimeUnit unit) 
throws InterruptedException
{ };  
//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

public boolean tryAcquire(int permits)
{ }; 
//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire(int permits, long timeout,
TimeUnit unit) 
throws InterruptedException
{ }; 
//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

  另外还可以通过availablePermits()方法得到可用的许可数目。

  下面通过一个例子来看一下Semaphore的具体使用:

  假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。(有人将这个类的应用归结为厕所理论,其实都是一样的)那么我们就可以通过Semaphore来实现:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

public class Test
{

    public static void main(String[]
args) {

        int N
8;            //工人数

        Semaphore
semaphore = 
new Semaphore(5); //机器数目

        for(int i=0;i<N;i++)

            new Worker(i,semaphore).start();

    }

    

    static class Worker extends Thread{

        private int num;

        private Semaphore
semaphore;

        public Worker(int num,Semaphore
semaphore){

            this.num
= num;

            this.semaphore
= semaphore;

        }

        

        @Override

        public void run()
{

            try {

                semaphore.acquire();

                System.out.println("工人"+this.num+"占用一个机器在生产...");

                Thread.sleep(2000);

                System.out.println("工人"+this.num+"释放出机器");

                semaphore.release();           

            catch (InterruptedException
e) {

                e.printStackTrace();

            }

        }

    }

}

1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

    CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;

    而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

    另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

时间: 2024-12-09 23:11:06

与AQS有关的并发类的相关文章

架构师养成--7.同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

17.并发类容器设计

并发类容器设计 1.ConcurrentHashMap:代替散列普通的hashTable,添加了复合操作支持. private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); for (Map.Entry<String, Object> m : resultMap.entrySet()) { count += (Long) m.getValue(

同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

并发编程(9):同步类容器与并发类容器

1.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作. 复合操作,如: 迭代(反复访问元素,遍历完容器中所有的元素) 跳转(根据指定的顺序找到当前元素的下一个元素) 条件运算 这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的就是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector.HashTble.这

第七章定制并发类

Java 7 并发编程实战手册目录 代码下载(https://github.com/Wang-Jun-Chao/java-concurrency) 第七章定制并发类 7.1简介 Java并发API提供了大量接口和类来实现并发应用程序.这些接口和类既包含了底层机制,如Thread类.Runnable接口或Callable接口.synchronized关键字,也包含了高层机制,如在Java 7中增加的Executor框架和Fork/Join框架.尽管如此,在开发应用程序时,仍会发现己有的Java类无

再谈AbstractQueuedSynchronizer:基于AbstractQueuedSynchronizer的并发类实现

公平模式ReentrantLock实现原理 前面的文章研究了AbstractQueuedSynchronizer的独占锁和共享锁,有了前两篇文章的基础,就可以乘胜追击,看一下基于AbstractQueuedSynchronizer的并发类是如何实现的. ReentrantLock显然是一种独占锁,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基础类,继承自AbstractQueuedSynchronizer,看一下代码实现: 1 abstract stati

线程学习--(七)单例和多线程、同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

第一章 Java常用的并发类

注:本系列博客主要参考于<分布式Java应用:基础与实践>,林昊 著 1.常用的并发集合类 ConcurrentHashMap:线程安全的HashMap的实现 CopyOnWriteArrayList:线程安全且在读操作时无锁的ArrayList CopyOnWriteArraySet:基于CopyOnWriteArrayList,不添加重复元素 ArrayBlockingQueue:基于数组.先进先出.线程安全,可实现指定时间的阻塞读写,并且容量可以限制 LinkedBlockingQueu

并发类容器-第二讲

一.首先我们来熟悉一下跳表(数据结构和算法) 参考资料: https://www.cnblogs.com/seniusen/p/9870398.html https://blog.csdn.net/u010425776/article/details/54890215 跳表的由来:作用:存储有序序列,并且实现高效的查找与插入删除.存储有序序列最简单的办法就是使用数组,从而查找可以采用二分搜索,但插入删除需要移动元素较为低效.因此出现了二叉搜索树,用来解决插入删除移动元素的问题.但二叉搜索树在最坏