集合工具类使用线程
1. hashmap源码解析与并发可能遇见的问题
1.HashMap中的几个重要变量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认初始容量,必须是2的n次方 static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量,当通过构造方法传入的容量比它还大时,就用这个最大容量,必须是2的n次方 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认负载因子 transient Entry<K,V>[] table; //用来存储键值对,可以看到键值对都是存储在Entry中的 transient int size; //存放元素的个数 int threshold; //临界值 当实际大小超过临界值时,会进行扩容threshold = 加载因子*容量 final float loadFactor; //加载因子 transient int modCount; //被修改的次数 2.Entry是一个链表结构,不仅包含key和value,还有可以指向下一个的next static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; int hash; /** * Creates new entry. */ Entry(int h, K k, V v, Entry<K,V> n) { value = v; next = n; key = k; hash = h; } ... //3.put方法 public V put(K key, V value) { if (key == null) return putForNullKey(value);//储存空键 int hash = hash(key);//计算hash值 int i = indexFor(hash, table.length);//计算存储位置 for (Entry<K,V> e = table[i]; e != null; e = e.next) {//遍历hashmap的内部数据 Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } //这个for循环,当发生并发,两个线程冲突的时候,这个链表的结构会发生变化:可能两个key互为对方的next元素。此时通过next遍历,会形成死循环。在jdb8中已经不存在了。最好的解决办法是使用concurrenthashmap modCount++; addEntry(hash, key, value, i); return null; } //首先通过hash方法对hashcode进行处理: final int hash(Object k) { int h = 0; h ^= k.hashCode(); h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); } //可以看到只是在key的hashcode值上做了一些处理,通过hash计算出来的值将会使用indexFor方法找到它应该所在的table下标: static int indexFor(int h, int length) { return h & (length-1); } //这个方法其实相当于对table.length取模。 //当需要插入的key为null时,调用putForNullKey方法处理: private V putForNullKey(V value) { for (Entry<K,V> e = table[0]; e != null; e = e.next) { if (e.key == null) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; addEntry(0, null, value, 0); return null; } //putForNullKey方法只从table[0]这个位置开始遍历,因为key为null只放在table中的第一个位置,下标为0,在遍历中如果发现已经有key为null了,则替换新value,返回旧value,结束;如果还没有key为null,调用addEntry方法增加一个Entry: void addEntry(int hash, K key, V value, int bucketIndex) { if ((size >= threshold) && (null != table[bucketIndex])) { resize(2 * table.length); hash = (null != key) ? hash(key) : 0; bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } //可以看到jdk7中resize的条件已经发生改变了,只有当 size>=threshold并且 table中的那个槽中已经有Entry时,才会发生resize。即有可能虽然size>=threshold,但是必须等到每个槽都至少有一个Entry时,才会扩容。还有注意每次resize都会扩大一倍容量 void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; } //最后看createEntry,它先保存这个桶中的第一个Entry,创建新的Entry放入第一个位置,将原来的Entry接在后面。这里采用的是头插法插入元素。 4.get方法 //其实get方法和put方法如出一辙,怎么放的怎么拿 public V get(Object key) { if (key == null) return getForNullKey(); Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } //key为null时,还是去table[0]去取: private V getForNullKey() { for (Entry<K,V> e = table[0]; e != null; e = e.next) { if (e.key == null) return e.value; } return null; } //否则调用getEntry方法: final Entry<K,V> getEntry(Object key) { int hash = (key == null) ? 0 : hash(key); for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; } //这个方法也是通过key的hashcode计算出它应该所在的下标,再遍历这个下标的Entry链,如果key的内存地址相等(即同一个引用)或者equals相等,则说明找到了 A、等幂性。不管执行多少次获取Hash值的操作,只要对象不变,那么Hash值是固定的。如果第一次取跟第N次取不一样,那就用起来很麻烦. B、对等性。若两个对象equal方法返回为true,则其hash值也应该是一样的。举例说明:若你将objA作为key存入HashMap中,然后new了一个objB。在你看来objB和objA是一个东西(因为他们equal),但是使用objB到hashMap中却取不出来东西。 C、互异性。若两个对象equal方法返回为false,hash值有可能相同,但最好是不同的,这个不是必须的,只是这样做会提高hash类操作的性能(碰撞几率低)。 解决hash碰撞的方法: hashmap采用的就是链地址法,这种方法好处是无堆积现象,但是next指针会占用额外空间 在jdk8中,仍然会根据key.hashCode()计算出hash值,再通过这个hash值去定位这个key,但是不同的是,当发生冲突时,会采用链表和红黑树两种方法去处理,当结点个数较少时用链表(用Node存储),个数较多时用红黑树(用TreeNode存储),同时结点也不叫Entry了,而是分成了Node和TreeNode。再最坏的情况下,链表查找的时间复杂度为O(n),而红黑树一直是O(logn),这样会提高HashMap的效率。 Put方法也变了,为了防止并发问题。 扩展:为何数组的长度是 2 的 n 次方呢? 1.这个方法非常巧妙,它通过 h & (table.length -1) 来得到该对象的保存位,而HashMap 底层数组的长度总是 2 的 n 次方,2n-1 得到的二进制数的每个位上的值都为 1,那么与全部为 1 的一个数进行与操作,速度会大大提升。 2.当 length 总是 2 的 n 次方时,h& (length-1)运算等价于对 length 取模,也就是h%length,但是&比%具有更高的效率。 3.当数组长度为 2 的 n 次幂的时候,不同的 key 算得的 index 相同的几率较小,那么数据在数组上分布就比较均匀,也就是说碰撞的几率小,相对的,查询的时候就不用遍历某个位置上的链表,这样查询效率也就较高了。 HashMap 的扩容机制: 当 HashMap 中的结点个数超过数组大小*loadFactor(加载因子)时,就会进行数组扩容,loadFactor 的默认值为 0.75。也就是说,默认情况下,数组大小为 16,那么当 HashMap中结点个数超过 16*0.75=12 的时候,就把数组的大小扩展为 2*16=32,即扩大一倍,然后重新计算每个元素在数组中的位置,并放进去,而这是一个非常消耗性能的操作。 |
Hashmap和HashTable的异同:
01.两者的默认容量与负载因子有变化
02.hashtable的 容量可以是任意值,而hashmap必须是2的次幂
03.hashtable中在put方法里面不允许值与键为空
04.计算索引的方式不同(indexof函数不同)
05.hashtable大部分方法都加上了sychronied关键字
06.hashtable每次扩容,容量为原来的两倍加2.
2. concurrenthashmap源码解析与并发编程
使用与获取全局信息的方法并不频繁的时候
01.在 ConcurrentHashMap 中,不允许用 null 作为键和值。
02.ConcurrentHashMap 使用分段锁(减少锁粒度)技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
默认情况下分为16个段。
03.当增加一个新的表项,不是全部加锁,会先计算在哪个段,对指定的段加锁。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; //上面两行用于获取段号 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);//得到段,将数据插入到段中 return s.put(key, hash, value, false); }
04.当系统需要取得全局锁,消耗资源就会比较多。比如size()方法:事实上会先使用无锁的方式求和,如果失败,会先获得所有段的锁再去求和。
3. BlockingQueue
A线程可以知道b线程的存在
是一个接口并非一个具体实现:
ArrayBlockingQueue
ArrayBlockingQueue的内部元素都放置在一个对象数组中:final Object[] items;
Offer():当队列已经满了,会立即返回false
Put():如果队列满了会一直等待
Pool():弹出元素,如果为空返回null
Take():弹出元素,如果为空等待到有元素即可。
Take方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
Put方法:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
LinkedBlockingQueue(锁分离)
两把不同的锁 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
Take函数 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//不能有两个线程同时取数据 try { while (count.get() == 0) {//如果没有数据,一直等待(因为是lockInterruptibly,可中断) notEmpty.await(); } x = dequeue();//取得第一个数据 c = count.getAndDecrement();//数量-1,原子操作,因为会和put同时访问count。 if (c > 1) notEmpty.signal();//通知其他take操作 } finally { takeLock.unlock();//释放锁 } if (c == capacity) signalNotFull();//通知put操作,已有空余空间 return x; }
Put函数 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//上锁不能有两个线程同时进行put函数 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) {//当队列已经满了以后,等待 notFull.await(); } enqueue(node);//插入数据 c = count.getAndIncrement();//更新总数 if (c + 1 < capacity) notFull.signal();//有足够的空间,通知其他线程 } finally { putLock.unlock();//释放锁 } if (c == 0) signalNotEmpty();//释放成功后,通知take函数取数据 }
4. 并发下的ArrayList
当ArrayList在扩容的时候,内部一致性被破坏,由于没有锁的保护,另外一个线程访问不到不一致的内部状态,导致出现越界问题。
还会出现多个线程同时对同一位置进行赋值。
5. concurrentlinkedqueue:
高并发环境中可以说是最好的队列,也可以看做是一个线程安全的linkedList。
6. CopyOnWriteArrayList
性能很好的读写list,在读写的时候任何时候都不加锁;只有在写写的时候需要同步等待。
当写操作的时候,进行一次自我复制。对原有的数据进行一次复制,将修改的内容写入副本修改完之后,将副本替换原来的数据。
原文地址:http://blog.51cto.com/qinbin/2055554