JDK 源码解析 —— ConcurrentHashMap

零. 概述

ConcurrentHashMap 是将锁的范围细化来实现高效并发的。 基本策略是将数据结构分为一个一个 Segment(每一个都是一个并发可读的 hash table, 即分段锁)作为一个并发单元。 为了减少开销, 除了一处 Segment 是在构造器初始化的, 其他都延迟初始化(详见 ensureSegment)。 并使用 volatile 关键字来保证 Segment 延迟初始化的可见性问题。

HashMap 不是线程安全的, 故多线程情况下会出现 infinit loop。 HashTable 是线程安全的, 但是是用全局锁来保障, 效率很低。 所以 Doug Lea 并发专家研发了高效并发的 ConcurrentHashMap 来应对并发情况下的情景。 阅读本文前最好先看: Java
内存模型
 和 AtomicInteger 分析

一. 术语定义

术语 英文 解释
哈希算法 hash algorithm 是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值。
哈希表 hash table 根据设定的哈希函数H(key)和处理冲突方法将一组关键字映象到一个有限的地址区间上,并以关键字在地址区间中的象作为记录在表中的存储位置,这种表称为哈希表或散列,所得存储位置称为哈希地址或散列地址。

二. 数据结构

类图:

抽象结构图:

从上述类图和抽象结构图可以看出 ConcurrentHashMap 是由 Segment
数组结构和 HashEntry 数组结构组成。Segment 继承(Generalization)了可重入锁ReentrantLock,HashEntry 用于存储键值对数据。一个 ConcurrentHashMap 里 contains-a (Composition)

一个 Segment [],Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里 has-a (Aggregation)一个
HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 锁定一个 HashEntry 数组里的元素, 当对 HashEntry 数组的数据进行 put 等修改操作时,必须先获得它对应的 Segment 锁。

三.  源码解析

内部类 Segment 类:

Segment 维护着条目列表状态一致性, 所以可以实现无锁读。 在表超出 threshold 进行 resize 的时候, 复制节点, 所以在做 resize 修改操作的时候, 还可以进行读操作(在旧 list 读)。 本类里只有变化操作的方法才需要加锁, 变化的方法利用一系列忙等控制来处理资源争用, 例如 scanAndLock 和 scanAndLockForPut
。 那些遍历去查找节点的 tryLocks() 方法, 主要是用来吸收 cached 不命中(在 hash tables 经常出现), 这样后续获取锁的遍历操作效率将会有不小提升。 我们可能不是真的需要使用找到的数据, 因为重新获得数据还需要加锁来保证更新操作的一致性, 但他们会更快地进行重定位。 此外,  scanAndLockForPut 特地创建新数据用于没有数据被找到的 put 方法。

// 在准备锁住 segment 操作前最大的 tryLock() 次数。 多核情况下, 在定位 nodes 时使用 64 次最大值维持缓存

static final int MAX_SCAN_RETRIES =
    Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

/**
 * The per-segment table. 数据访问通过
 * entryAt/setEntryAt 提供的 volatile 语义来保证可见性.
 */
transient volatile HashEntry<K,V>[] table;

/**
 * sengment 内 hash entry 元素个数 , 之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了避免出现“热点域”而影响 ConcurrentHashMap 的并发性。

 */
transient int count;

/**
 * 在 segment 中可变操作总数, 即更新次数
 */
transient int modCount;

/**
 * 当超过threshold 时候,  table 再哈希
 * (The value of this field is always <tt>(int)(capacity *
 * loadFactor)</tt>.)
 */
transient int threshold;

/**
 *  hash table 负载因子.  Even though this value
 * is same for all segments, it is replicated to avoid needing
 * links to outer object.
 * @serial
 */
final float loadFactor;

put 方法:

插入A, B, C后 Segment 示意图:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 // tryLock 一般缓存作用
HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        // 找到 bucket 位置
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    // 保存旧值, 这样 get 操作就可以无锁访问正在写操作的节点
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖原来的值
                        e.value = value;
                        // 修改计数
                        ++modCount;
                    }
                    break;
                }
                // 每次从头部插入
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 超过 threshold 则, rehash()
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 典型的 ReentrantLock 释放锁
        unlock();
    }
    return oldValue;
}

rehash() 方法:  在新
table 中重新分类节点。 因为使用了 2 的幂指数扩展方式, bucket/bin 中的数据还是在原位, 即旧数据的索引位置不变或者偏移了 2 的幂指数距离。 可以重用旧节点减少不必要的节点生成。

/**
 * table 大小 *2, 重新放置 HashEntry, 加入新节点
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
    // 保存旧值以便 get 操作遍历
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   //  单节点链表
                newTable[idx] = e;
            else { // 重用在同一个 slot 的连续序列
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

内部类
HashEntry:链表结构

static final class HashEntry<K,V> {
    // final 保证不变性, 即为线程安全的字段
    final int hash;
    final K key;
    // 根据 volatile 写永远先于读操作的 happens-before 原则来保证获取到都是最新值
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /**
     * 使用 volatile 写语义设置 next
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = HashEntry.class;
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

ConcurrentHashMap
类:

/* ---------------- Constants -------------- */

/**
 * The default initial capacity for this table,
 * used when not otherwise specified in a constructor.
 */
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
 * 本值是 HashEntry 个数与 table 数组长度的比值
 *
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
 *
 * 当前并发线程的使用数
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * The maximum capacity, used if a higher value is implicitly
 * specified by either of the constructors with arguments.  MUST
 * be a power of two <= 1<<30 to ensure that entries are indexable
 * using ints.
 */
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * The minimum capacity for per-segment tables.  Must be a power
 * of two, at least two to avoid immediate resizing on next use
 * after lazy construction.
 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
 * The maximum number of segments to allow; used to bound
 * constructor arguments. Must be power of two less than 1 << 24.
 */
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

/**
 * Number of unsynchronized retries in size and containsValue
 * methods before resorting to locking. This is used to avoid
 * unbounded retries if tables undergo continuous modification
 * which would make it impossible to obtain an accurate result.
 */
static final int RETRIES_BEFORE_LOCK = 2;
/**
 * 索引 segments 时使用: 使用高比特位的 hash 值去选择 segment
 */
final int segmentMask;

/**
 * Shift value for indexing within segments.
 */
final int segmentShift;

初始化 ConcurrentHashMap:

ConcurrentHashMap
结构图

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。

初始化segmentShift和segmentMask。这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。

变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。

先写到这, 今天有点累了, ConcurrentHashMap 的 get, put 等方法下次更新...

四.
参考资料

JDK 1.6、 1.7 源码

《并发编程实战》

https://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

http://www.infoq.com/cn/articles/ConcurrentHashMap

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-08 17:16:36

JDK 源码解析 —— ConcurrentHashMap的相关文章

设计模式-简单工厂Coding+jdk源码解析

前面的软件设计七大原则,目前只有理论这块,因为最近参与项目重构,暂时没有时间把Coding的代码按照设计思路一点点写出来.本周周末会花时间整理出来,然后现在想的是白天更新一点并发编程,晚上回家学习设计模式.非科班出身,脑子也比较笨.博文都是自己根据学习的时候所想的思路,如果能有帮到各位的地方,那十分荣幸.如果有欠缺之处,希望能在评论中指出一起进步.好啦,开始正文了. 本套设计模式的博文,包含各种设计模式的定义.类型.适用场景及优缺点分析.并通过Coding去实际加深理论理解. 简单工厂: 该模式

JDK 源码解析 —— HashSet

零. 简介 这个类实现了 Set 接口,内部是由 HashMap 实例辅助实现的.它不保证元素的顺序,数据允许为 null. 假如 hash 方法将数据分散地比较合理,比如一个 bucket 一个数据,那么 add.remove.contains.size 性能开销是常数时间. 这个类非线程安全的,如果多线程并发访问,并且至少一个线程在做修改操作,那么必须在外部做好同步处理.例如使用:Set s = Collections.synchronizedSet(new HashSet(...)); 一

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

JDK 1.8 源码解析 ConcurrentHashMap

JDK 1.7中ConcurrentHashMap 基本结构: 每一个segment都是一个HashEntry<K,V>[] table, table中的每一个元素本质上都是一个HashEntry的单向队列.比如table[3]为首结点,table[3]->next为结点1,之后为结点2,依次类推. 1 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> 2 implements Concu

JDK 源码解析 —— CyclicBarrier

一. 简介 CyclicBarrier 是一个让一系列线程集合互相等待直到一个公共屏障点(barrier point)的同步辅助工具.这个屏障被称为循环屏障,是因为它可以在等待线程释放后被重用. CyclicBarrier 支持一个可选的 Runnable 命令,在最后一个线程到达后执行一次 Runnable 命令. 二. 简单使用示例 CyclicBarrier(3) 等到 3 个线程都到了,这个对象还可以重用,而 CountDownLatch 则不能重用,从 Cyclic 名字就可以看出这个

JDK 源码解析 —— Integer

零. 简介 对于 Integer 这个 Java 程序员几乎天天使用的类, 使用上却可以看出普通程序员和优秀程序员区别. 一. 深入代码 在创建数字 1 的对象时, 大多数人会使用 new Integer(1), 而使用 Integer.valueOf(1) 可以使用系统缓存,既减少可能的内存占用,也省去了频繁创建对象的开销. 系统默认只缓存 -128-127 之间的整数.下面我们看一下 Integer.valueOf(int) 方法的代码: public static Integer valu

JDK 源码解析 —— Semaphore

零. 简介 这是一个用来对并发计数的信号量,并发量超过一定数值则只能等待.从概念上来说,semaphore 维持着一组许可证.获取锁的时候,需要先获得 semaphore 的许可才行. 一. 从 Demo 解析源码 package com.wenniuwuren.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concu

Integer.parseInt不同jdk源码解析

执行以下代码: System.out.println(Integer.parseInt("-123")); System.out.println(Integer.parseInt("+123")); 以下仅提供1.6和1.7两个版本的比较  1.6版本执行结果为:    1.7版本执行结果为: 从两方面去查证结果的原因,分别是:查看API文档 和 查看对应的源代码 [查看API文档]  1.6版本对应的API文档:    1.7版本对应的API文档: 可以看出,对第

【jdk源码分析】ArrayList的size()==0和isEmpty()

先看结果 分析源码 [jdk源码解析]jdk8的ArrayList初始化长度为0 java的基本数据类型默认值 无参构造 size()方法 isEmpty()方法 原文地址:https://www.cnblogs.com/xiaostudy/p/10781148.html