java并发编程基础-ReentrantLock及LinkedBlockingQueue源码分析

ReentrantLock是一个较为常用的锁对象。在上次分析的uil开源项目中也多次被用到,下面谈谈其概念和基本使用。

概念

一个可重入的互斥锁定 Lock,它具有与使用 synchronized 相同的一些基本行为和语义,但功能更强大。

名词解释:

互斥

表示同一时刻,多个线程中,只能有一个线程能获得该锁。但是多个线程都可以调用lock方法,只有一个会成功,其他的线程会被阻塞,直到该锁被释放

可重入

模仿synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续)synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个 synchronized 块时,才释放锁。
对于ReentrantLock,每次获得锁,并将请求计数置为一,如果同一个线程再次lock,计数器将递增,每次unlock时计数器值递减,直到计数器为0,锁释放

lock方法过程

如果该锁没有被另一个线程保持,则lock时获取该锁定并立即返回,将锁定的保持计数设置为 1。
如果当前线程已经保持该锁定,则将保持计数加 1,并且该方法立即返回。
如果该锁定被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态,此时锁定保持计数被设置为 1。

unLock方法过程

每次unlock时计数器值递减,直到计数器为0,释放锁

Condition类

该类与lock绑定,用newCondition()方法创建,提供了线程之间通信的方式(类似信号量)。其使用基本与object类的wait,notify,notifyAll相同。
用condition.await()替换Object,wait(),调用时该线程阻塞,释放该线程的锁。
用condition.signal()替换Object.notify(),用condition.signalAll()替换Object.notifyAll(),唤醒该condition await方法所阻塞的线程

相对synchronized优势

锁投票(我也不是特别理解,可以通过投票获取锁?)
定时锁等候
中断锁等候
线程A和B都要获取对象O的锁定,假设A获取了对象O锁,B将等待A释放对O的锁定,
如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断
如果 使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情

使用

以下以linkedBlokingQueue源码为例子,来学习其使用。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
      //链表节点node类结构
      static class Node<E> {
          volatile E item;//volatile,保证了数据的可见性
          Node<E> next;
          Node(E x) { item = x; }
      }
      //容量
      private final int capacity;
      //用原子变量,当前元素个数
      private final AtomicInteger count = new AtomicInteger(0);
      //头节点
      private transient Node<E> head;
      //表尾节点
      private transient Node<E> last;
      //获取元素或删除元素时,要加的takeLock锁
      private final ReentrantLock takeLock = new ReentrantLock();
      //获取元素时若队列为空,线程阻塞,直至notEmpty条件满足(被通知)
      private final Condition notEmpty = takeLock.newCondition();
      //插入元素时 要加putLock锁
      private final ReentrantLock putLock = new ReentrantLock();
      //插入时,若队列已满,线程阻塞,直至notFull条件满足(被通知)
      private final Condition notFull = putLock.newCondition();
      // 唤醒等待的take操作,插入数据时若插入前链表中无数据,则调用,表示链表不再为空
      private void signalNotEmpty() {
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lock();
          try {
              notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
      }
      //唤醒等待插入操作,移除数据时若链表原先已满则调用,表示链表不再满
      private void signalNotFull() {
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              notFull.signal();
          } finally {
              putLock.unlock();
          }
      }
      // 插入到链表尾部
      private void insert(E x) {
          last = last.next = new Node<E>(x);
      }
      //获取并移除头元素
      private E extract() {
          Node<E> first = head.next;
          head = first;
          E x = first.item;
          first.item = null;
          return x;
      }
      //锁住两把锁,在remove,clear等方法中调用
      private void fullyLock() {
          putLock.lock();
          takeLock.lock();
      }
      //和fullyLock成对使用
      private void fullyUnlock() {
          takeLock.unlock();
          putLock.unlock();
      }
      //默认构造,容量为 Integer.MAX_VALUE  

      public LinkedBlockingQueue() {
          this(Integer.MAX_VALUE);
      }
      //指定容量的构造
      public LinkedBlockingQueue(int capacity) {
          if (capacity <= 0) throw new IllegalArgumentException();
          this.capacity = capacity;
          last = head = new Node<E>(null);
      }
      //指定初始化集合的构造
      public LinkedBlockingQueue(Collection<? extends E> c) {
          this(Integer.MAX_VALUE);
          for (E e : c)
              add(e);
      }
      //获得大小 

      public int size() {
          return count.get();
      }
      //剩余容量
      public int remainingCapacity() {
          return capacity - count.get();
      }
      // 将指定元素插入到此队列的尾部,如已满,阻塞至队列中有元素被移除
      public void put(E e) throws InterruptedException {
          if (e == null) throw new NullPointerException();
          int c = -1;
          final ReentrantLock putLock = this.putLock;
          final AtomicInteger count = this.count;
   //加put锁,多个线程不能同时进入
          putLock.lockInterruptibly();
          try {
              try {
    //容量已满,则一直阻塞
                  while (count.get() == capacity)
                      notFull.await();
              } catch (InterruptedException ie) {
                  notFull.signal(); // propagate to a non-interrupted thread
                  throw ie;
              }
//插入
              insert(e);
              c = count.getAndIncrement();
//通知链表未满
              if (c + 1 < capacity)
                  notFull.signal();
          } finally {
//解锁,注意必须在finally里调用,反正各种异常导致没有unlock使线程死锁
              putLock.unlock();
          }
    //通知链表非空
          if (c == 0)
              signalNotEmpty();
      }
      // 将指定元素插入到此队列的尾部,如有必要,则等待一定时间以使空间变得可用。 

      public boolean offer(E e, long timeout, TimeUnit unit)
          throws InterruptedException {
          if (e == null) throw new NullPointerException();
          long nanos = unit.toNanos(timeout);
          int c = -1;
          final ReentrantLock putLock = this.putLock;
          final AtomicInteger count = this.count;
   //加锁
          putLock.lockInterruptibly();
          try {
              for (;;) {
                  //未满可插入
                  if (count.get() < capacity) {
                      insert(e);
                      c = count.getAndIncrement();
    //通知未满
                      if (c + 1 < capacity)
                          notFull.signal();
    //跳出循环
                      break;
                  }
   //队列已满,未能插入,等待时间是负的,直接返回
                  if (nanos <= 0)
                      return false;
                  try {
    //等待一定时间后再次尝试
                      nanos = notFull.awaitNanos(nanos);
                  } catch (InterruptedException ie) {
                      notFull.signal(); // propagate to a non-interrupted thread
                      throw ie;
                  }
              }
          } finally {
//解锁
              putLock.unlock();
          }
//通知已插入数据,链表非空
          if (c == 0)
              signalNotEmpty();
          return true;
      }
      //将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),
       在成功时返回 true,如果此队列已满,则返回 false。 

      public boolean offer(E e) {
          if (e == null) throw new NullPointerException();
          final AtomicInteger count = this.count;
          if (count.get() == capacity)
              return false;
          int c = -1;
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
//由于可能在lock被阻塞时其他线程进行了插入操作,需再次判断count
              if (count.get() < capacity) {
                  insert(e);
                  c = count.getAndIncrement();
    //通知未满
                  if (c + 1 < capacity)
                      notFull.signal();
              }
          } finally {
              putLock.unlock();
          }
  //通知非空
          if (c == 0)
              signalNotEmpty();
          // >0表示已成功插入
          return c >= 0;
      }
      //获取并移除此队列的头部,若队列为空,则阻塞。
      public E take() throws InterruptedException {
          E x;
          int c = -1;
          final AtomicInteger count = this.count;
          final ReentrantLock takeLock = this.takeLock;
   //加锁
          takeLock.lockInterruptibly();
          try {
              try {
   //队列为空时阻塞
                  while (count.get() == 0)
                      notEmpty.await();
              } catch (InterruptedException ie) {
                  notEmpty.signal(); // propagate to a non-interrupted thread
                  throw ie;
              }
//获取数据
              x = extract();
              c = count.getAndDecrement();
//通知非空
              if (c > 1)
                  notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
    //通知未满
          if (c == capacity)
              signalNotFull();
          return x;
      }  

      //与offer方法结构基本一致,若队列为空,则阻塞一段时间,一段时间后仍为空,则返回null
      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
          E x = null;
          int c = -1;
          long nanos = unit.toNanos(timeout);
          final AtomicInteger count = this.count;
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lockInterruptibly();
          try {
              for (;;) {
                  if (count.get() > 0) {
                      x = extract();
                      c = count.getAndDecrement();
                      if (c > 1)
                          notEmpty.signal();
                      break;
                  }
                  if (nanos <= 0)
                      return null;
                  try {
                      nanos = notEmpty.awaitNanos(nanos);
                  } catch (InterruptedException ie) {
                      notEmpty.signal(); // propagate to a non-interrupted thread
                      throw ie;
                  }
              }
          } finally {
              takeLock.unlock();
          }
          if (c == capacity)
              signalNotFull();
          return x;
      }  

      ////与offer方法结构基本一致 队列为空,不阻塞,直接返回null
      public E poll() {
          final AtomicInteger count = this.count;
          if (count.get() == 0)
              return null;
          E x = null;
          int c = -1;
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lock();
          try {
              if (count.get() > 0) {
                  x = extract();
                  c = count.getAndDecrement();
                  if (c > 1)
                      notEmpty.signal();
              }
          } finally {
              takeLock.unlock();
          }
          if (c == capacity)
              signalNotFull();
          return x;
      }
      //获取但不移除此队列的头;如果此队列为空,则返回 null。
      public E peek() {
          if (count.get() == 0)
              return null;
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lock();
          try {
              Node<E> first = head.next;
              if (first == null)
                  return null;
              else
                  return first.item;
          } finally {
              takeLock.unlock();
          }
      }
      /**
       * 从此队列移除指定元素的单个实例(如果存在)。
       */
      public boolean remove(Object o) {
          if (o == null) return false;
          boolean removed = false;
   //同时加锁,此时其他线程不能插入,不能移除
          fullyLock();
          try {
              Node<E> trail = head;
              Node<E> p = head.next;
//遍历,获取到该元素
              while (p != null) {
                  if (o.equals(p.item)) {
                      removed = true;
                      break;
                  }
                  trail = p;
                  p = p.next;
              }
//删除该元素
              if (removed) {
                  p.item = null;
                  trail.next = p.next;
                  if (last == p)
                      last = trail;
                  if (count.getAndDecrement() == capacity)
                      notFull.signalAll();
              }
          } finally {
              fullyUnlock();
          }
          return removed;
      }
      ……
  }

http://coderrobin.com/2015/02/12/java%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E5%9F%BA%E7%A1%80-ReentrantLock/

时间: 2024-11-03 21:49:56

java并发编程基础-ReentrantLock及LinkedBlockingQueue源码分析的相关文章

【Java并发编程】19、DelayQueue源码分析

DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间).DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间.DelayQueue不允许包含null元素. 领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听.

并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析

BlockingQueue 实现之 SynchronousQueue SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样. 不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行

【Java并发编程】22、Exchanger源码解析

Exchanger是双向的数据传输,2个线程在一个同步点,交换数据.先到的线程会等待第二个线程执行exchangeSynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take. 先举个例子说明一下如何使用 public class ExchangerDemo { public static void main(String[] args) { Exchanger<List<Integer>> exchanger = new Exchanger<&g

关于java中ReentrantLock类的源码分析以及总结与例子

一,官方描述 关于ReentrantLock的官方描述,英文的就不贴出来了,这里我只贴出我自己翻译的描述: reentrant是一个跟synchronized具有相同行为和语义的持有锁来访问方法和语句的互斥锁,但是reentrant还拥有被扩展的能力. ReentrantLock会被线程拥有并且持续锁定,不会解锁.线程调用lock()方法返回后,则成功持有锁,否则这个锁正在被另一个线程所持有,只能等待另一个线程释放锁,如果当前线程拥有了锁,则调用lock()方法会立即返回,这个状态可以通过isH

ReentrantLock 与 AQS 源码分析

ReentrantLock 与 AQS 源码分析 1. 基本结构 ?? 重入锁 ReetrantLock,JDK 1.5新增的类,作用与synchronized关键字相当,但比synchronized更加灵活.ReetrantLock本身也是一种支持重进入的锁,即该锁可以支持一个线程对资源重复加锁,但是加锁多少次,就必须解锁多少次,这样才可以成功释放锁. 1. 继承 没有继承任何类,因为很多操作都使用了组合完成. 2. 实现 Lock, java.io.Serializable ??这里着重介绍

java中的==、equals()、hashCode()源码分析(转载)

在java编程或者面试中经常会遇到 == .equals()的比较.自己看了看源码,结合实际的编程总结一下. 1. ==  java中的==是比较两个对象在JVM中的地址.比较好理解.看下面的代码: 1 public class ComAddr{ 2 public static void main(String[] args) throws Exception { 3 String s1 = "nihao"; 4 String s2 = "nihao"; 5 Str

【小白的java成长系列】——顶级类Object源码分析

首先来说一下api文档使用,api这个词对有一定开发经验的java编程人员来说是很喜爱的~ java当然也提供了api开发文档,下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html 找到下面的: 下载自己喜爱的版本即可,解压,点击~/jdk-7u60-apidocs/api/index.html就可以查看其api了: 跟上网一样一样的,点击相应链接就可以查看其信息了. 进入正题,说说Object这个类: 先

LinkedBlockingQueue源码分析

1. LinkedBlockingQueue源码分析(JDK8) 2. LinkedBlockingQueue源码分析 啦啦啦

4.java并发编程艺术-java并发编程基础

java从诞生开始就明智的选择了内置对多线程的支持,这使得java语言相比同一时期的其他语言具有明显的优势.线程作为操作系统调度的最小单元,多个线程能够同时执行,这将显著提升程序的性能,在多核环境中表现的更加明显.但是,过多的创建线程和对线程的不当管理也容易造成问题.本章将着重介绍java并发编程的基础知识,从启动一个线程到线程间不同的通信方式,最后通过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容. 1.线程简介 1.1 什么是线程 现代操作系统中在运行一个程序时,会为其