java 并发(concurrent)包源码分析

参考连接:

http://www.cnblogs.com/luoxn28/p/6059881.html

http://www.cnblogs.com/java-zhao/p/5140158.html

持续更新中。。。。。

并发是一种能并行运行多个程序或并行运行一个程序中多个部分的能力。如果程序中一个耗时的任务能以异步或并行的方式运行,那么整个程序的吞吐量和可交互性将大大改善。现代的PC都有多个CPU或一个CPU中有多个核,是否能合理运用多核的能力将成为一个大规模应用程序的关键。

  Java多线程相关类的实现都在Java的并发包concurrent,concurrent包主要包含3部分内容,第一个是atomic包,里面主要是一些原子类,比如AtomicInteger、AtomicIntegerArray等;第二个是locks包,里面主要是锁相关的类,比如ReentrantLock、Condition等;第三个就是属于concurrent包的内容,主要包括线程池相关类(Executors)、阻塞集合类(BlockingQueue)、并发Map类(ConcurrentHashMap)等。

/*************************************************************************************************************************************************************/

atomic包

/*************************************************************************************************************************************************************/

JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令。在java.util.concurrent.atomic包下面的所有的原子变量类型中,比如AtomicInteger,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作。

  Unsafe中的操作一般都是基于CAS来实现的,CAS就是Compare and Swap的意思,比较并操作。很多的cpu直接支持CAS指令。CAS是一项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

下面就以AtomicInteger为例。

在没有AtomicInteger之前,对于一个Integer的线程安全操作,是需要使用同步锁来实现的,当然现在也可以通过ReentrantLock来实现,但是最好最方便的实现方式是采用AtomicInteger。

测试代码:

 1 package com.collection.test;
 2
 3 import java.util.concurrent.atomic.AtomicInteger;
 4
 5 /**
 6  * 原子类的测试
 7  */
 8 public class AtomicTest {
 9     private static AtomicInteger atomicInteger = new AtomicInteger();
10
11     //获取当前值
12     public static void getCurrentValue(){
13         System.out.println(atomicInteger.get());//-->0
14     }
15
16     //设置value值
17     public static void setValue(){
18         atomicInteger.set(12);//直接用12覆盖旧值
19         System.out.println(atomicInteger.get());//-->12
20     }
21
22     //根据方法名称getAndSet就知道先get,则最后返回的就是旧值,如果get在后,就是返回新值
23     public static void getAndSet(){
24         System.out.println(atomicInteger.getAndSet(15));//-->12
25     }
26
27     public static void getAndIncrement(){
28         System.out.println(atomicInteger.getAndIncrement());//-->15
29     }
30
31     public static void getAndDecrement(){
32         System.out.println(atomicInteger.getAndDecrement());//-->16
33     }
34
35     public static void getAndAdd(){
36         System.out.println(atomicInteger.getAndAdd(10));//-->15
37     }
38
39     public static void incrementAndGet(){
40         System.out.println(atomicInteger.incrementAndGet());//-->26
41     }
42
43     public static void decrementAndGet(){
44         System.out.println(atomicInteger.decrementAndGet());//-->25
45     }
46
47     public static void addAndGet(){
48         System.out.println(atomicInteger.addAndGet(20));//-->45
49     }
50
51     public static void main(String[] args) {
52         AtomicTest test = new AtomicTest();
53         test.getCurrentValue();
54         test.setValue();
55         //返回旧值系列
56         test.getAndSet();
57         test.getAndIncrement();
58         test.getAndDecrement();
59         test.getAndAdd();
60         //返回新值系列
61         test.incrementAndGet();
62         test.decrementAndGet();
63         test.addAndGet();
64
65     }
66 }
AtomicInteger类的源代码
  1 private volatile int value;// 初始化值
  2
  3     /**
  4      * 创建一个AtomicInteger,初始值value为initialValue
  5      */
  6     public AtomicInteger(int initialValue) {
  7         value = initialValue;
  8     }
  9
 10     /**
 11      * 创建一个AtomicInteger,初始值value为0
 12      */
 13     public AtomicInteger() {
 14     }
 15
 16     /**
 17      * 返回value
 18      */
 19     public final int get() {
 20         return value;
 21     }
 22
 23     /**
 24      * 为value设值(基于value),而其他操作是基于旧值<--get()
 25      */
 26     public final void set(int newValue) {
 27         value = newValue;
 28     }
 29
 30     public final boolean compareAndSet(int expect, int update) {
 31         return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
 32     }
 33
 34     /**
 35      * 基于CAS为旧值设定新值,采用无限循环,直到设置成功为止
 36      *
 37      * @return 返回旧值
 38      */
 39     public final int getAndSet(int newValue) {
 40         for (;;) {
 41             int current = get();// 获取当前值(旧值)
 42             if (compareAndSet(current, newValue))// CAS新值替代旧值
 43                 return current;// 返回旧值
 44         }
 45     }
 46
 47     /**
 48      * 当前值+1,采用无限循环,直到+1成功为止
 49      * @return the previous value 返回旧值
 50      */
 51     public final int getAndIncrement() {
 52         for (;;) {
 53             int current = get();//获取当前值
 54             int next = current + 1;//当前值+1
 55             if (compareAndSet(current, next))//基于CAS赋值
 56                 return current;
 57         }
 58     }
 59
 60     /**
 61      * 当前值-1,采用无限循环,直到-1成功为止
 62      * @return the previous value 返回旧值
 63      */
 64     public final int getAndDecrement() {
 65         for (;;) {
 66             int current = get();
 67             int next = current - 1;
 68             if (compareAndSet(current, next))
 69                 return current;
 70         }
 71     }
 72
 73     /**
 74      * 当前值+delta,采用无限循环,直到+delta成功为止
 75      * @return the previous value  返回旧值
 76      */
 77     public final int getAndAdd(int delta) {
 78         for (;;) {
 79             int current = get();
 80             int next = current + delta;
 81             if (compareAndSet(current, next))
 82                 return current;
 83         }
 84     }
 85
 86     /**
 87      * 当前值+1, 采用无限循环,直到+1成功为止
 88      * @return the updated value 返回新值
 89      */
 90     public final int incrementAndGet() {
 91         for (;;) {
 92             int current = get();
 93             int next = current + 1;
 94             if (compareAndSet(current, next))
 95                 return next;//返回新值
 96         }
 97     }
 98
 99     /**
100      * 当前值-1, 采用无限循环,直到-1成功为止
101      * @return the updated value 返回新值
102      */
103     public final int decrementAndGet() {
104         for (;;) {
105             int current = get();
106             int next = current - 1;
107             if (compareAndSet(current, next))
108                 return next;//返回新值
109         }
110     }
111
112     /**
113      * 当前值+delta,采用无限循环,直到+delta成功为止
114      * @return the updated value 返回新值
115      */
116     public final int addAndGet(int delta) {
117         for (;;) {
118             int current = get();
119             int next = current + delta;
120             if (compareAndSet(current, next))
121                 return next;//返回新值
122         }
123     }
124
125     /**
126      * 获取当前值
127      */
128     public int intValue() {
129         return get();
130     }

注意:

  • value是volatile的,关于volatile的相关内容见:http://www.cnblogs.com/java-zhao/p/5125698.html
  • 单步操作:例如set()是直接对value进行操作的,不需要CAS,因为单步操作就是原子操作。
  • 多步操作:例如getAndSet(int newValue)是两步操作-->先获取值,在设置值,所以需要原子化,这里采用CAS实现。
  • 对于方法是返回旧值还是新值,直接看方法是以get开头(返回旧值)还是get结尾(返回新值)就好
  • CAS:比较CPU内存上的值是不是当前值current,如果是就换成新值update,如果不是,说明获取值之后到设置值之前,该值已经被别人先一步设置过了,此时如果自己再设置值的话,需要在别人修改后的值的基础上去操作,否则就会覆盖别人的修改,所以这个时候会直接返回false,再进行无限循环,重新获取当前值,然后再基于CAS进行加减操作。
  • 如果还是不懂CAS,类比数据库的乐观锁
 1 // setup to use Unsafe.compareAndSwapInt for updates
 2     private static final Unsafe unsafe = Unsafe.getUnsafe();
 3     private static final long valueOffset;
 4
 5     static {
 6         try {
 7             valueOffset = unsafe.objectFieldOffset
 8                 (AtomicInteger.class.getDeclaredField("value"));
 9         } catch (Exception ex) { throw new Error(ex); }
10     }
11
12     private volatile int value;

这是AtomicInteger的所有属性,其中value存的是当前值,而当前值存放的内存地址可以通过valueOffset来确定。实际上是“value字段相对Java对象的起始地址的偏移量”

1 public final boolean compareAndSet(int expect, int update) {
2          return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
3      }

CAS方法:通过对比“valueOffset上的value”与expect是否相同,来决定是否修改value值为update值。

/*************************************************************************************************************************************************************/

lock包

/*************************************************************************************************************************************************************/

/*************************************************************************************************************************************************************/

concurrent源生包

/*************************************************************************************************************************************************************/

时间: 2024-08-08 22:05:44

java 并发(concurrent)包源码分析的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》04 ConcurrentMap

Java集合框架中的Map类型的数据结构是非线程安全,在多线程环境中使用时需要手动进行线程同步.因此在java.util.concurrent包中提供了一个线程安全版本的Map类型数据结构:ConcurrentMap.本篇文章主要关注ConcurrentMa接口以及它的Hash版本的实现ConcurrentHashMap. ConcurrentMap是Map接口的子接口 public interface ConcurrentMap<K, V> extends Map<K, V> 与

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

Java并发编程 ReentrantLock 源码分析

ReentrantLock 一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大. 这个类主要基于AQS(AbstractOwnableSynchronizer)封装的 公平与非公平锁. 所谓公平锁就是指 在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程,换句话说也就是先被锁定的线程首先获得锁. 非公平锁正好相反,解锁时没有固定顺序. 让我们边分析源代码边学习如何使用该类 先来看一下构造参数,默认