《java.util.concurrent 包源码阅读》04 ConcurrentMap

Java集合框架中的Map类型的数据结构是非线程安全,在多线程环境中使用时需要手动进行线程同步。因此在java.util.concurrent包中提供了一个线程安全版本的Map类型数据结构:ConcurrentMap。本篇文章主要关注ConcurrentMa接口以及它的Hash版本的实现ConcurrentHashMap。

ConcurrentMap是Map接口的子接口

public interface ConcurrentMap<K, V> extends Map<K, V>

与Map接口相比,ConcurrentMap多了4个方法:

1)putIfAbsent方法:如果key不存在,添加key-value。方法会返回与key关联的value。

V putIfAbsent(K key, V value);

2)remove方法

boolean remove(Object key, Object value);

Map接口中也有一个remove方法:

V remove(Object key);

ConcurrentMap中的remove方法需要比较原有的value和参数中的value是否一致,只有一致才会删除。

3)Replace方法:有2个重载

boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);

两个重载的区别和2)中的两个remove方法的区别很类似,多了一个检查value一致。

通过ConcurrentMap多出来的方法可以看到多线程中一个很重要的概念:compare。compare的作用就是为了保证value的一致性。

重头戏来了:ConcurrentHashMap。

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

ConcurrentHashMap和HashMap类似,这里重点关注的是如何实现线程安全,也就是如何加锁。

对于HashMap来说,有一个Entry数组,根据Key的hash值对数组长度取模得到数组下标,找到Entry,遍历整个Entry链表,用equals比较来确定key所在的Entry。

ConcurrentHashMap的基本思想是采取分块的方式加锁,分块数由参数“concurrencyLevel”来决定(和HashMap中的“initialCapacity”类似,实际块数是第一个大于concurrencyLevel的2的n次方)。每个分块被称为Segment,Segment的索引方式和HashMap中的Entry索引方式一致(hash值对数组长度取模)。

对Segment加锁的方式很简单,直接把Segment定义为ReentrantLock的子类。同时Segment又是一个特定实现的hash table。

static final class Segment<K,V> extends ReentrantLock implements Serializable

下面分析ConcurrentHashMap读写时如何加锁。

首先是读操作类的方法,来看get方法:

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

可以看到,读取的时候没有调用的Segment的获取锁的方法,而是通过hash值定位到Entry,然后遍历Entry的链表。为什么这里不用加锁呢?看看HashEntry的代码就会明白了。

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

value和next属性是带有volatile修饰符的,可以大胆放心的遍历和比较。

接着是写操作,写操作是肯定要加锁的。因为Segment可以看成是一个hash table,因此ConcurrentHashMap直接调用Segment的对应的写入方法如put,replace等。

比如put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

因此这里直接关注Segment的对应写操作方法即可。在每个写操作的方法开头都这样的类似代码:

        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);

也就是,首先尝试获取锁,如果成功则会带锁继续操作,失败则要通过scanAndLock或scanAndLockForPut获取锁,因此这里关注的重点也就转移到这两个方法了。

按照多线程环境的规则,如果尝试获取锁失败的话就会进入阻塞等待状态,那么这两个方法的作用应该是类似的。

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

这两个方法的逻辑:在等待的时候闲着没事儿干把该做好的准备做好,查找一下目标entry,如果是新建entry就把entry创建好,然后如果一切没问题就用lock()方法把自己给阻塞了,也就是做好准备然后去等着了。

因为Segment本身就可以看成一个hash table,因此必然涉及rehash的问题,因为和HashMap中的rehash类似,在这里就省略了。

《java.util.concurrent 包源码阅读》04 ConcurrentMap

时间: 2024-08-03 08:06:59

《java.util.concurrent 包源码阅读》04 ConcurrentMap的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker