并发编程(三):从AQS到CountDownLatch与ReentrantLock

一、目录

1、AQS简要分析

2、谈CountDownLatch

3、谈ReentrantLock

4、谈消费者与生产者模式(notfiyAll/wait、signAll/await、condition)

二、AQS简要分析

问题:AQS是什么?有什么用?

AQS是什么?

字面上看,它被称为抽象队列式的同步器(AbstractQueuedSynchronizer)。简单说,它就是一个同步队列容器。

AQS有什么用?

  1. 为什么会产生ArrayList、LinkedList、HashMap这些容器?它们底层实现无非都是对数组、链表、树的操作,至于它们的产生,就是因为对编程人员对于数组、链表、树的增删改查操作非常繁琐而提出的解决方案。
  2. 那为什么会产生AQS呢?谈到同步,大家最容易想到的就是在多线程中如何确保安全的资源共享。那同步队列就是为了解决资源共享的同步容器。像上述容器一样,在顶层就设计好,编程人员只需要调用接口就能轻易实现复杂的资源共享问题。

既然谈到资源共享,那同步容器怎么实现资源共享呢?

AQS定义两种资源共享方式:Exclusive(独占、只有一个线程执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

那什么是独占式?

在谈synchronized的资源共享实现方式的时候,当线程A访问共享资源的时候,其它的线程全部被堵塞,直到线程A读写完毕,其它线程才能申请同步互斥锁从而访问共享资源。如果之前看过我关于synchronized的讨论,这里应该不难理解,为了照顾未了解过的读者,再重新回顾一下。

以RenentrantLock为例,如何知道共享资源是否有线程正在被访问呢?其实,它有一个state变量初始值为0,表示未锁定状态。当线程A访问的时候,state+1,就代表该线程锁定了共享资源,其他线程将无法访问,而当线程A访问完共享资源以后,state-1,直到state等于0,就将释放对共享变量的锁定,其他线程将可以抢占式或者公平式争夺。当然,它支持可重入,那什么是可重入呢?同一线程可以重复锁定共享资源,每锁定一次state+1,也就是锁定多次。说明:锁定多少次就要释放多少次。

什么是共享式呢?

以CountDownLatch为例,共享资源可以被N个线程访问,也就是初始化的时候,state就被指定为N(N与线程个数相等),线程countDown()一次,state会CAS减1,直到所有线程执行完(state=0),那些await()的线程将被唤醒去执行执行剩余动作。

什么是CAS?CAS的定义为Compare-And-Swap,语义为比较并且交换。在深入理解JVM书中,谈到自旋锁,因为锁的堵塞释放对于cpu资源的损害很高,那么自旋锁就是当线程A访问共享资源的时候,其他线程并不放弃对锁的持有,它们在不停循环,不断尝试的获取锁,直到获得锁就停止循环,自旋锁是对于资源共享的一种优化手段,但是它适用于对锁持有时间比较短的情况。

独占式lock流程(unlock同理):

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功就返回。
  2. 没成功,则addWaiter()将线程加入等待队列的尾部,并标记为独享模式。
  3. acquireQueued()使线程在等待队列中休息,有机会时会去尝试获得资源。获得资源后返回。如果整个过程有中断过返回true,否则返回false。
  4. 如果线程在等待过程中中断过,它是不响应的。只是获得资源后才再进行自我中断selfInterrupt(),将中断补上。

共享式流程(类似于独占式 ):

  1. tryAcquireShared()尝试获取资源,成功则直接返回。
  2. 失败则通过 doAcquireShared()进入等待队列,直到被唤醒或者中断并且成功获取资源才返回。
  3. 不同:独占式是只唤醒后继节点。共享式是唤醒后继,后继还会去唤醒它的后继,从而实现共享。

以上是核心的关于CountDownLatch、ReentrantLock的分析。由于博主研究程度有限,想更深层次研究,请参考:Java并发AQS详解

三、浅谈CountDownLatch

CountDownLatch是什么? 有什么用?

CountDownLatch是一个同步容器,但是有人叫它发令枪,也有人叫它门闩。初始化设定线程的个数,调用countDownLatch.await()阻塞所有线程,直到countDownLatch.countDown()为0,那么将继续执行剩余的操作。例如,跑步比赛,所有线程都await()在起跑线,当所有人告诉裁判准备好了,裁判发令枪一响,运动员开炮。门闩道理一样,门不开全给我等着!

作用:为了实现同步共享数据的一种更加高效的解决办法。

/**
 * CountDownLatch相当于指令枪或者门闩,所有线程都awit()阻塞在起跑线,只有countDown到state为0,其他线程才能往下运行。
 * @author qiuyongAaron
 */
public class CountDownLatchDemo {
     private static final int PLAYER_NUM=5;

     public static void main(String[] args) {

           CountDownLatch start=new CountDownLatch(1);
           CountDownLatch end =new CountDownLatch(PLAYER_NUM);
           Player [] players=new Player[PLAYER_NUM];

           for(int i=0;i<PLAYER_NUM;i++)
                players[i]=new Player(start, end, i);
           //指定线程个数的线程池!
           ExecutorService exe=Executors.newFixedThreadPool(PLAYER_NUM);
           for(Player player:players)
                exe.execute(player);

           System.out.println("比赛开始!");
           //比赛开始!
           start.countDown();

           try {
                end.await();
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                System.out.println("比赛结束!");
                exe.shutdown();
           }
     }

}

class Player implements Runnable{
     private CountDownLatch start;
     private CountDownLatch end;
     private int id;

     Random random=new Random();
     public Player(CountDownLatch start,CountDownLatch end,int id) {
           this.start=start;
           this.end=end;
           this.id=id;
     }

     @Override
     public void run() {
           try {
                //等待比赛开始。
                start.await();
                TimeUnit.SECONDS.sleep(random.nextInt(10));
                System.out.println("Player-"+id+":arrived");
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                //选手-id到达终点,end计数为0结束比赛!
                end.countDown();
           }
     }
}

//运行结果:
比赛开始!
Player-3:arrived
Player-4:arrived
Player-0:arrived
Player-1:arrived
Player-2:arrived
比赛结束!

三、谈ReentrantLock

1、ReentrantLock是什么?有什么用?

ReentrantLock跟synchronized作用差不多,是在于synchronized基础上的一种简易同步容器,并没有深层次的原理剖析。

2、ReentrantLock的基础用法

2.1 回顾synchronized如何实现线程同步。

/**
 * 示例一:同步锁的使用
 * reentrantlock用于替代synchronized
 * 本例中由于m1锁定this,只有m1执行完毕的时候,m2才能执行
 * @author qiuyongAaron
 */
public class ReentrantLockOne {
     public synchronized void m1(){
           for(int i=0;i<10;i++){
                try {
                     TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
                System.out.println(i);
           }
     }

     public synchronized void m2(){
           System.out.println("hello m2!");
     }

     public static void main(String[] args) {
           ReentrantLockOne lock=new ReentrantLockOne();

           new Thread(()->lock.m1(),"t1").start();

           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }

           new Thread(()->lock.m2(),"t2").start();
     }
}

Synchronized实现线程同步

2.2 ReentrantLock实现线程同步-与synchronized作用一致!

/**
 * 示例二:等价于同步锁
 * 使用reentrantlock可以完成同样的功能
 * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
 * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
 * @author qiuyongAaron
 */
public class ReentrantLockTwo {
     ReentrantLock lock =new ReentrantLock();
     public  void m1(){
           try {
                lock.lock();
                for(int i=0;i<10;i++){
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(i);
                }
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                lock.unlock();
           }
     }

     public synchronized void m2(){
           lock.lock();
           System.out.println("hello m2!");
           lock.unlock();
     }

     public static void main(String[] args) {
           ReentrantLockTwo lock=new ReentrantLockTwo();

           new Thread(()->lock.m1(),"t1").start();

           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }

           new Thread(()->lock.m2(),"t2").start();
     }
}

ReentrantLock同步互斥

2.3 ReentrantLock尝试获取锁,若指定时间无法获取锁放弃等待!

/**
 * 示例三:tryLock
 * 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
 * @author qiuyongAaron
 */
public class ReentrantLockThree {
     ReentrantLock lock=new ReentrantLock();

     public void m1(){
           try {
                lock.lock();
                for(int i=0;i<10;i++){
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(i);
                }
           } catch (Exception e) {
                e.printStackTrace();
           }finally{
                lock.unlock();
           }
     }

     boolean locked=false;
     public void m2(){
           try {
                lock.tryLock(5,TimeUnit.SECONDS);
                System.out.println("m2:"+locked);
           } catch (Exception e) {
                e.printStackTrace();
           }finally{
                if(locked) lock.unlock();
           }
     }

     public static void main(String[] args) {
           ReentrantLockThree lock=new ReentrantLockThree();

           new Thread(()->lock.m1(),"t1").start();

           try {
                TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }

           new Thread(()->lock.m2(),"t2").start();
     }
}

ReentrantLock尝试获取锁

2.4 指定公平锁或者抢占式锁

/**
 * ReentrantLock还可以指定为公平锁
 * @author qiuyongAaron
 */
public class ReentrantLockFive extends Thread{

     //默认false:为非公平锁  true:公平锁
     ReentrantLock lock=new ReentrantLock();

     @Override
     public void run() {
           for(int i=0;i<100;i++){
                lock.lock();
                try {
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(Thread.currentThread().getName()+"获得锁"+"-"+i);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }finally{
                     lock.unlock();
                }
           }

     }

     public static void main(String[] args) {
           ReentrantLockFive lock=new ReentrantLockFive();
           new Thread(lock,"t1").start();
           new Thread(lock,"t2").start();
     }
}

运行结果:
//非公平锁
t2获得锁-0 t2获得锁-1 t1获得锁-0 t1获得锁-1 t1获得锁-2 t2获得锁-2
//公平锁
t1获得锁-0 t2获得锁-0 t1获得锁-1 t2获得锁-1 t1获得锁-2 t2获得锁-2

ReentrantLock公平锁

3、ReentrantLock实现线程通信

/**
 * 模拟生产者消费者模式-线程之间通信 synchronized-notifyAll/wait
 * @author qiuyongAaron
 */
public class MyContainerOne {
     LinkedList<Integer> list=new LinkedList<Integer>();
      static final int MAX=10;
      int count=0;

      //生产者线程
      public synchronized void put(int i){
            while(list.size()==MAX){
                 try {
                     this.wait();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
            list.add(i);
            ++count;
            this.notifyAll();//通知消费者来消费
      }

      //消费者线程
      public synchronized int get(){
            while(list.size()==0){
                 try {
                     this.wait();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
            int num=list.removeFirst();
            count--;
            this.notifyAll();//通知生产者生产
            return num;
      }

      public static void main(String[] args) {
           MyContainerOne container=new MyContainerOne();

           //制造10个消费者
           for(int i=0;i<10;i++){
                new Thread(()->{
                     for(int j=0;j<5;j++) System.out.println(container.get());
                     },
                "c"+i).start();
           }

           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }

           //制造2个生产者
           for(int i=0;i<2;i++){
                new Thread(()->{
                     for(int j=0;j<25;j++) container.put(j);
                     },
                "p"+i).start();
           }
     }
}
/**
 * 模拟生产者消费者模式-reentrantLock-awit/signAll
 * @author qiuyongAaron
 */
public class MyContainerTwo {

     LinkedList<Integer> list=new LinkedList<Integer>();
      static final int MAX=10;
      int count=0;

      ReentrantLock lock=new ReentrantLock();
      Condition producer=lock.newCondition();
      Condition consumer=lock.newCondition();

      //生产者线程
      public  void put(int i){
            try {
                 lock.lock();
                 while(list.size()==MAX){
                     producer.await();
                 }
                 list.add(i);
                 ++count;
                 consumer.signalAll();//通知消费者来消费
            } catch (InterruptedException e){
                 e.printStackTrace();
            }finally{
                 lock.unlock();
            }
      }

      //消费者线程
      public  int get(){
            try{
                 lock.lock();
                 while(list.size()==0){
                      consumer.await();
                 }
                 int num=list.removeFirst();
                 count--;
                 producer.signalAll();//通知生产者生产
                 return num;
            }catch(Exception e){
                 e.printStackTrace();
            }finally{
                 lock.unlock();
            }
            return 0;
      }

      public static void main(String[] args) {
           MyContainerTwo container=new MyContainerTwo();

           //制造10个消费者
           for(int i=0;i<10;i++){
                new Thread(()->{
                     for(int j=0;j<5;j++) System.out.println(container.get());
                     },
                "c"+i).start();
           }

           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }

           //制造2个生产者
           for(int i=0;i<2;i++){
                new Thread(()->{
                     for(int j=0;j<25;j++) container.put(j);
                     },
                "p"+i).start();
           }
     }
}

总结:synchronized实现线程的消费者-生产者模式是通过wait/notifyAll实现,ReentrantLock是通过condition+await/signAll。那他们有什么区别呢?synchronized要么通过notify随机唤醒一个,或者notifyAll唤醒所有不管你是消费者还是生产者、而ReentrantLock是唤醒指定的线程的,更加精确效率更高。

四、版权声明

  作者:邱勇Aaron

  出处:http://www.cnblogs.com/qiuyong/

  您的支持是对博主深入思考总结的最大鼓励。

  本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,尊重作者的劳动成果。

  参考:马士兵并发编程、并发编程实践

     AQS详解:http://www.cnblogs.com/waterystone/p/4920797.html

     CountDownLatch详解:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315652.html

时间: 2024-11-06 09:43:29

并发编程(三):从AQS到CountDownLatch与ReentrantLock的相关文章

【Java并发编程实战】—– AQS(三):阻塞、唤醒:LockSupport

在上篇博客([Java并发编程实战]-– AQS(二):获取锁.释放锁)中提到,当一个线程加入到CLH队列中时,如果不是头节点是需要判断该节点是否需要挂起:在释放锁后,需要唤醒该线程的继任节点 lock方法,在调用acquireQueued(): if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; 在acquireQueued()中调用parkAndCheckIn

Java并发编程三个性质:原子性、可见性、有序性

并发编程 并发程序要正确地执行,必须要保证其具备原子性.可见性以及有序性:只要有一个没有被保证,就有可能会导致程序运行不正确 线程不安全在编译.测试甚至上线使用时,并不一定能发现,因为受到当时的CPU调度顺序,线程个数.指令重排的影响,偶然触发 线程安全的定义 比如说一个类,不论通过怎样的调度执行顺序,并且调用处不用对其进行同步操作,其都能表现出正确的行为,则这个类就是线程安全的 并发编程三个概念 原子性: 一个操作或多个操作要么全部执行且执行过程不被中断,要么不执行 可见性: 多个线程修改同一

并发编程三要素:原子性,有序性,可见性

并发编程三要素 原子性:一个不可再被分割的颗粒.原子性指的是一个或多个操作要么全部执行成功要么全部执行失败. 有序性: 程序执行的顺序按照代码的先后顺序执行.(处理器可能会对指令进行重排序) 可见性: 一个县城对共享变量的修改,另一个线程能够立刻看到. 一.原子性 线程切换会带来原子性的问题 int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到主内存. 虽然读取和写入都是原子操作,但合起来就不属于原子操作,我们又叫这种为"

并发编程的基石——AQS类

本博客系列是学习并发编程过程中的记录总结.由于文章比较多,写的时间也比较散,所以我整理了个目录贴(传送门),方便查阅. 并发编程系列博客传送门 本文参考了[Java多线程进阶(六)-- J.U.C之locks框架:AQS综述(1)]和Java技术之AQS详解两篇文章. AQS 简介 AbstractQueuedSynchronizer (简称AQS)类是整个 JUC包的核心类.JUC 中的ReentrantLock.ReentrantReadWriteLock.CountDownLatch.Se

Java并发编程总结3——AQS、ReentrantLock、ReentrantReadWriteLock

本文内容主要总结自<Java并发编程的艺术>第5章——Java中的锁. 一.AQS AbstractQueuedSynchronizer(简称AQS),队列同步器,是用来构建锁或者其他同步组建的基础框架.该类主要包括: 1.模式,分为共享和独占. 2.volatile int state,用来表示锁的状态. 3.FIFO双向队列,用来维护等待获取锁的线程. AQS部分代码及说明如下: public abstract class AbstractQueuedSynchronizer extend

【Java并发编程实战】—– AQS(四):CLH同步队列

在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁.入队列.释放锁等实现都与头尾节点相关.而且每一个节点都引入前驱节点和后兴许节点的引用:在等待机制上由原来的自旋改成堵塞唤醒. 其结构例如以下: 知道其结构了,我们再看看他的实现.在线程获取锁时会调用AQS的acquire()方法.该方法第一次尝试获取锁假设

Java并发编程总结3——AQS、ReentrantLock、ReentrantReadWriteLock(转)

本文内容主要总结自<Java并发编程的艺术>第5章——Java中的锁. 一.AQS AbstractQueuedSynchronizer(简称AQS),队列同步器,是用来构建锁或者其他同步组建的基础框架.该类主要包括: 1.模式,分为共享和独占. 2.volatile int state,用来表示锁的状态. 3.FIFO双向队列,用来维护等待获取锁的线程. AQS部分代码及说明如下: public abstract class AbstractQueuedSynchronizer extend

漫谈并发编程(三):共享受限资源

解决共享资源竞争 一个不正确的访问资源示例 考虑下面的例子,其中一个任务产生偶数,而其他任务消费这些数字.这里,消费者任务的唯一工作就是检查偶数的有效性. 我们先定义一个偶数生成器的抽象父类. public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next( ); public void cancle( ) { canceled = true; } p

【并发编程】Java并发编程-看懂AQS的前世今生

在我们可以深入学习AbstractQueuedSynchronizer(AQS)之前,必须具备了volatile.CAS和模板方法设计模式的知识,本文主要想从AQS的产生背景.设计和结构.源代码实现及AQS应用这4个方面来学习下AQS 如果想学习Java工程化.高性能及分布式.深入浅出.微服务.Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家. 1.AQS产生背景 通过