现在的位置: 首页 > 综合 > 正文

Java多线程 — ConcurrentHashMap

2018年05月11日 ⁄ 综合 ⁄ 共 15757字 ⁄ 字号 评论关闭

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

弱一致的迭代器

由 ConcurrentHashMap 返回的迭代器的语义又不同于 ava.util 集合中的迭代器;而且它又是 弱一致的(weakly consistent) 而非 fail-fast的(所谓 fail-fast 是指,当正在使用一个迭代器的时候,如何底层的集合被修改,就会抛出一个异常)。当一个用户调用keySet().iterator() 去迭代器中检索一组 hash 键的时候,实现就简单地使用同步来保证每个链的头指针是当前值。next()和hasNext()
操作以一种明显的方式定义,即遍历每个链然后转到下一个链直到所有的链都被遍历。弱一致迭代器可能会也可能不会反映迭代器迭代过程中的插入操作,但是一定会反映迭代器还没有到达的键的更新或删除操作,并且对任何值最多返回一次。ConcurrentHashMap 返回的迭代器不会抛出 ConcurrentModificationException 异常。

我们通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

对于哈希表,Java中采用链表的方式来解决hash冲突的。
一个HashMap的数据结构看起来类似下图:

实现了同步的HashTable也是这样的结构,它的同步使用锁来保证的,并且所有同步操作使用的是同一个锁对象。这样若有n个线程同时在get时,这n个线程要串行的等待来获取锁。

ConcurrentHashMap中对这个数据结构,针对并发稍微做了一点调整。
它把区间按照并发级别(concurrentLevel),分成了若干个segment。默认情况下内部按并发级别为16来创建。对于每个segment的容量,默认情况也是16。当然并发级别(concurrentLevel)和每个段(segment)的初始容量都是可以通过构造函数设定的。

创建好ConcurrentHashMap之后,它的结构大致如下图:

看起来只是把以前HashTable的一个hash bucket创建了16份而已。有什么特别的吗?没啥特别的。
继续看每个segment是怎么定义的:

static final class Segment<K,V> extends ReentrantLock implements Serializable 

Segment继承了ReentrantLock,表明每个segment都可以当做一个锁。(ReentrantLock前文已经提到,不了解的话就把当做synchronized的替代者吧)这样对每个segment中的数据需要同步操作的话都是使用每个segment容器对象自身的锁来实现。只有对全局需要改变时锁定的是所有的segment。

上面的这种做法,就称之为“分离锁(lock striping)”。有必要对“分拆锁”“分离锁”的概念描述一下:

分拆锁(lock spliting)就是若原先的程序中多处逻辑都采用同一个锁,但各个逻辑之间又相互独立,就可以拆(Spliting)为使用多个锁,每个锁守护不同的逻辑。
分拆锁有时候可以被扩展,分成可大可小加锁块的集合,并且它们归属于相互独立的对象,这样的情况就是分离锁(lock striping)。(摘自《Java并发编程实践》)

ConcurrentHashMap之实现细节(JDK1.7)

锁分离 (Lock Stripping)

ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁。
ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。

HashEntry定义
类似HashMap,Segment中内部数组的每一项都是一个单项链节点,它包含了key、hash、value等信息:

[java] view
plain
copy

  1. /** 
  2.      * ConcurrentHashMap list entry. Note that this is never exported 
  3.      * out as a user-visible Map.Entry. 
  4.      */  
  5.     static final class HashEntry<K,V> {  
  6.         final int hash;  
  7.         final K key;  
  8.         volatile V value;  
  9.         volatile HashEntry<K,V> next;  
  10.   
  11.         HashEntry(int hash, K key, V value, HashEntry<K,V> next) {  
  12.             this.hash = hash;  
  13.             this.key = key;  
  14.             this.value = value;  
  15.             this.next = next;  
  16.         }  
  17.   
  18.         /** 
  19.          * Sets next field with volatile write semantics.  (See above 
  20.          * about use of putOrderedObject.) 
  21.          */  
  22.         final void setNext(HashEntry<K,V> n) {  
  23.             UNSAFE.putOrderedObject(this, nextOffset, n);  
  24.         }  

这意味着不能从hash链的中间或尾部添加或删除节点,因为构造函数只有一个next参数。所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。
在JDK 1.6中,HashEntry中的next指针也定义为final,并且每次插入将新添加节点作为链的头节点(同HashMap实现),而且每次删除一个节点时,会将删除节点之前的所有节点拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除以后有两条链存在,因而可以保证即使在同一条链中,有一个线程在删除,而另一个线程在遍历,它们都能工作良好,因为遍历的线程能继续使用原有的链。因而这种实现是一种更加细粒度的happens-before关系,即如果遍历线程在删除线程结束后开始,则它能看到删除后的变化,如果它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。如果这不符合你的需求,还是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()做的包装。
另一个不同于1.6版本中的实现是它提供setNext()方法,而且这个方法调用了Unsafe类中的putOrderedObject()方法,该方法只对volatile字段有用,关于这个方法的解释如下:

Sets the value of the object field at the specified offset in the supplied object to the given value. This is an ordered or lazy version of putObjectVolatile(Object,long,Object), which doesn't guarantee the immediate visibility of the change to other threads. It is only really useful where the object field is volatile, and is thus expected to change unexpectedly.

我对这个函数的理解:对volatile字段,按规范,在每次向它写入值后,它更新后的值立即对其他线程可见(可以简单的认为对volatile字段,每次读取它的值时都直接从内存中读取,而不会读缓存中的数据,如CPU的缓存;对写入操作也是直接写入内存),而这个函数可以提供一种选择,即使对volatile字段的写操作,我们也可以使用该方法将它作为一种普通字段来对待。这里setNext()方法的存在是为了在remove时不需要做拷贝额外链进行的优化,具体可以参看remove操作。

Segment中的put操作
在之前的JDK版本中,Segment的put操作开始时就会先加锁,直到put完成才解锁。在JDK 1.7中采用了自旋的机制,进一步减少了加锁的可能性。

[java] view
plain
copy

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {  
  2.             HashEntry<K,V> node = tryLock() ? null :  
  3.                 scanAndLockForPut(key, hash, value);  
  4.             V oldValue;  
  5.             try {  
  6.                 HashEntry<K,V>[] tab = table;  
  7.                 int index = (tab.length - 1) & hash;  
  8.                 HashEntry<K,V> first = entryAt(tab, index);  
  9.                 for (HashEntry<K,V> e = first;;) {  
  10.                     if (e != null) {  
  11.                         K k;  
  12.                         if ((k = e.key) == key ||  
  13.                             (e.hash == hash && key.equals(k))) {  
  14.                             oldValue = e.value;  
  15.                             if (!onlyIfAbsent) {  
  16.                                 e.value = value;  
  17.                                 ++modCount;  
  18.                             }  
  19.                             break;  
  20.                         }  
  21.                         e = e.next;  
  22.                     }  
  23.                     else {  
  24.                         if (node != null)  
  25.                             node.setNext(first);  
  26.                         else  
  27.                             node = new HashEntry<K,V>(hash, key, value, first);  
  28.                         int c = count + 1;  
  29.                         if (c > threshold && tab.length < MAXIMUM_CAPACITY)  
  30.                             rehash(node);  
  31.                         else  
  32.                             setEntryAt(tab, index, node);  
  33.                         ++modCount;  
  34.                         count = c;  
  35.                         oldValue = null;  
  36.                         break;  
  37.                     }  
  38.                 }  
  39.             } finally {  
  40.                 unlock();  
  41.             }  
  42.             return oldValue;  
  43.         }  


先不考虑自旋等待的问题,假如put一开始就拿到锁,那么它会执行以下逻辑:
根据之前计算出来的hash值找到数组相应bucket中的第一个链节点。这里需要注意的是:
a. 因为ConcurrentHashMap在计算Segment中数组长度时会保证该值是2的倍数,而且Segment在做rehash时也是每次增长一倍,因而数组索引只做"(tab.length - 1) & hash"计算即可。
b. 因为table字段时一个volatile变量,因而在开始时将该引用赋值给tab变量,可以减少在直接引用table字段时,因为该字段是volatile而不能做优化带来的损失,因为将table引用赋值给局不变量后就可以把它左右普通变量以实现编译、运行时的优化。
c. 因为之前已经将volatile的table字段引用赋值给tab局不变量了,为了保证每次读取的table中的数组项都是最新的值,因而调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取(在put操作更新节点链时,它采用Unsafe.putOrderedObject()操作,此时它对链头的更新只局限与当前线程,为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头)。
遍历数组项中的节点链,如果在节点中能找到key相等的节点,并且当前是put()操作而不是putIfAbsent()操作,纪录原来的值,更新该节点的值,并退出循环,put()操作完成。
如果在节点链中没有找到key相等的节点,创建一个新的节点,并将该节点作为当前链头插入当前链,并将count加1。和读取节点链连头想法,这里使用setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存[参考Java的内存模型,注1]。然后当节点数操作阀值(capacity*loadFactor),而数组长度没有达到最大数组长度,会做rehash。另外,如果scanAndLockForPut()操作返回了一个非空HashEntry,则表示在scanAndLockForPut()遍历key对应节点链时没有找到相应的节点,此时很多时候需要创建新的节点,因而它预创建HashEntry节点(预创建时因为有些时候它确实不需要再创建),所以不需要再创建,只需要更新它的next指针即可,这里使用setNext()实现延时写也时为了提升性能,因为当前修改并不需要让其他线程知道,在锁退出时修改自然会更新到内存中,如果采用直接赋值给next字段,由于next时volatile字段,会引起更新直接写入内存而增加开销。


Segment中的scanAndLockForPut操作
如put源码所示,当put操作尝试加锁没成功时,它不是直接进入等待状态,而是调用了scanAndLockForPut()操作,该操作持续查找key对应的节点链中是已存在该机节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,计所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64:

[java] view
plain
copy

  1. /** 
  2.          * Scans for a node containing given key while trying to 
  3.          * acquire lock, creating and returning one if not found. Upon 
  4.          * return, guarantees that lock is held. UNlike in most 
  5.          * methods, calls to method equals are not screened: Since 
  6.          * traversal speed doesn't matter, we might as well help warm 
  7.          * up the associated code and accesses as well. 
  8.          * 
  9.          * @return a new node if key not found, else null 
  10.          */  
  11.         private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {  
  12.             HashEntry<K,V> first = entryForHash(this, hash);  
  13.             HashEntry<K,V> e = first;  
  14.             HashEntry<K,V> node = null;  
  15.             int retries = -1// negative while locating node  
  16.             while (!tryLock()) {  
  17.                 HashEntry<K,V> f; // to recheck first below  
  18.                 if (retries < 0) {  
  19.                     if (e == null) {  
  20.                         if (node == null// speculatively create node  
  21.                             node = new HashEntry<K,V>(hash, key, value, null);  
  22.                         retries = 0;  
  23.                     }  
  24.                     else if (key.equals(e.key))  
  25.                         retries = 0;  
  26.                     else  
  27.                         e = e.next;  
  28.                 }  
  29.                 else if (++retries > MAX_SCAN_RETRIES) {  
  30.                     lock();  
  31.                     break;  
  32.                 }  
  33.                 else if ((retries & 1) == 0 &&  
  34.                          (f = entryForHash(this, hash)) != first) {  
  35.                     e = first = f; // re-traverse if entry changed  
  36.                     retries = -1;  
  37.                 }  
  38.             }  
  39.             return node;  
  40.         }  

在这段逻辑中,它先获取key对应的节点链的头,然后持续遍历该链,如果节点链中不存在要插入的节点,则预创建一个节点,否则retries值资增,直到操作最大尝试次数而进入等待状态。这里需要注意最后一个else if中的逻辑:当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。

Segment中的rehash操作
rehash的逻辑比较简单,它创建一个大原来两倍容量的数组,然后遍历原来数组以及数组项中的每条链,对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组中,这里需要重新创建一个新节点而不是修改原有节点的next指针时为了在做rehash时可以保证其他线程的get遍历操作可以正常在原有的链上正常工作,有点copy-on-write思想。然而Doug Lea继续优化了这段逻辑,为了减少重新创建新节点的开销,这里做了两点优化:1,对只有一个节点的链,直接将该节点赋值给新数组对应项即可(之所以能这么做是因为Segment中数组的长度也永远是2的倍数,而将数组长度扩大成原来的2倍,那么新节点在新数组中的位置只能是相同的索引号或者原来索引号加原来数组的长度,因而可以保证每条链在rehash是不会相互干扰);2,对有多个节点的链,先遍历该链找到第一个后面所有节点的索引值不变的节点p,然后只重新创建节点p以前的节点即可,此时新节点链和旧节点链同时存在,在p节点相遇,这样即使有其他线程在当前链做遍历也能正常工作:

[java] view
plain
copy

  1. /** 
  2.   * Doubles size of table and repacks entries, also adding the 
  3.   * given node to new table 
  4.   */  
  5.  @SuppressWarnings("unchecked")  
  6.  private void rehash(HashEntry<K,V> node) {  
  7.      /* 
  8.       * Reclassify nodes in each list to new table.  Because we 
  9.       * are using power-of-two expansion, the elements from 
  10.       * each bin must either stay at same index, or move with a 
  11.       * power of two offset. We eliminate unnecessary node 
  12.       * creation by catching cases where old nodes can be 
  13.       * reused because their next fields won't change. 
  14.       * Statistically, at the default threshold, only about 
  15.       * one-sixth of them need cloning when a table 
  16.       * doubles. The nodes they replace will be garbage 
  17.       * collectable as soon as they are no longer referenced by 
  18.       * any reader thread that may be in the midst of 
  19.       * concurrently traversing table. Entry accesses use plain 
  20.       * array indexing because they are followed by volatile 
  21.       * table write. 
  22.       */  
  23.      HashEntry<K,V>[] oldTable = table;  
  24.      int oldCapacity = oldTable.length;  
  25.      int newCapacity = oldCapacity << 1;  
  26.      threshold = (int)(newCapacity * loadFactor);  
  27.      HashEntry<K,V>[] newTable =  
  28.          (HashEntry<K,V>[]) new HashEntry[newCapacity];  
  29.      int sizeMask = newCapacity - 1;  
  30.      for (int i = 0; i < oldCapacity ; i++) {  
  31.          HashEntry<K,V> e = oldTable[i];  
  32.          if (e != null) {  
  33.              HashEntry<K,V> next = e.next;  
  34.              int idx = e.hash & sizeMask;  
  35.              if (next == null)   //  Single node on list  
  36.                  newTable[idx] = e;  
  37.              else { // Reuse consecutive sequence at same slot  
  38.                  HashEntry<K,V> lastRun = e;  
  39.                  int lastIdx = idx;  
  40.                  for (HashEntry<K,V> last = next;  
  41.                       last != null;  
  42.                       last = last.next) {  
  43.                      int k = last.hash & sizeMask;  
  44.                      if (k != lastIdx) {  
  45.                          lastIdx = k;  
  46.                          lastRun = last;  
  47.                      }  
  48.                  }  
  49.                  newTable[lastIdx] = lastRun;  
  50.                  // Clone remaining nodes  
  51.                  for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {  
  52.                      V v = p.value;  
  53.                      int h = p.hash;  
  54.                      int k = h & sizeMask;  
  55.                      HashEntry<K,V> n = newTable[k];  
  56.                      newTable[k] = new HashEntry<K,V>(h, p.key, v, n);  
  57.                  }  
  58.              }  
  59.          }  
  60.      }  
  61.      int nodeIndex = node.hash & sizeMask; // add the new node  
  62.      node.setNext(newTable[nodeIndex]);  
  63.      newTable[nodeIndex] = node;  
  64.      table = newTable;  
  65.  }  


Segment中的remove操作
在JDK 1.6版本中,remove操作比较直观,它先找到key对应的节点链的链头(数组中的某个项),然后遍历该节点链,如果在节点链中找到key相等的节点,则为该节点之前的所有节点重新创建节点并组成一条新链,将该新链的链尾指向找到节点的下一个节点。这样如前面rehash提到的,同时有两条链存在,即使有另一个线程正在该链上遍历也不会出问题。然而Doug Lea又挖掘到了新的优化点,在1.7中,他不再重新创建一条新的链,而是只在当起缓存中将链中找到的节点移除。当移除的是链头则更新数组项的值,否则更新找到节点的前一个节点的next指针。这也是HashEntry中next指针没有设置成final的原因。当然remove操作如果第一次尝试获得锁失败也会如put操作一样先进入自旋状态,这里的scanAndLock和scanAndLockForPut类似,只是它不做预创建节点的步骤,不再细说:

[java] view
plain
copy

  1. /** 
  2.          * Remove; match on key only if value null, else match both. 
  3.          */  
  4.         final V remove(Object key, int hash, Object value) {  
  5.             if (!tryLock())  
  6.                 scanAndLock(key, hash);  
  7.             V oldValue = null;  
  8.             try {  
  9.                 HashEntry<K,V>[] tab = table;  
  10.                 int index = (tab.length - 1) & hash;  
  11.                 HashEntry<K,V> e = entryAt(tab, index);  
  12.                 HashEntry<K,V> pred = null;  
  13.                 while (e != null) {  
  14.                     K k;  
  15.                     HashEntry<K,V> next = e.next;  
  16.                     if ((k = e.key) == key ||  
  17.                         (e.hash == hash && key.equals(k))) {  
  18.                         V v = e.value;  
  19.                         if (value == null || value == v || value.equals(v)) {  
  20.                             if (pred == null)  
  21.                                 setEntryAt(tab, index, next);  
  22.                             else  
  23.                                 pred.setNext(next);  
  24.                             ++modCount;  
  25.                             --count;  
  26.                             oldValue = v;  
  27.                         }  
  28.                         break;  
  29.                     }  
  30.                     pred = e;  
  31.                     e = next;  
  32.                 }  
  33.             } finally {  
  34.                 unlock();  
  35.             }  
  36.             return oldValue;  
  37.         }  


Segment中的其他操作
ConcurrentHashMap添加了replace接口,它和put的区别是put操作如果原Map中不存在key会将传入的键值对添加到Map中,而replace不会这么做,它只是简单的返回false。Segment中的replace操作先加锁或自旋等待,然后遍历相应的节点链,如果找到节点,则替换原有的值,返回true,否则返回false,比较简单,不细究。
Segment中的clear操作不同于其他操作,它直接请求加锁而没有自旋等待的步骤,这可能是因为它需要对整个table做操作,因而需要等到所有在table上的操作的线程退出才能执行,而不象其他操作只是对table中的一条链操作,对一条链操作的线程执行的比较快,因而自旋可以后获得锁的可能性比较大,对table操作的等待相对要比较久,因而自旋等待意义不大。clear操作只是将数组的每个项设置为null,它使用setEntryAt的延迟设置,从而保证其他读线程的正常工作。

Segment类的实现是ConcurrentHashMap实现的核心,因而理解了它的实现,要看ConcurrentHashMap的其他代码就感觉很简单和直观了。

ConcurrentHashMap中的get、containsKey、put、putIfAbsent、replace、Remove、clear操作
由于前面提到Segment中对HashEntry数组以及数组项中的节点链遍历操作是线程安全的,因而get、containsKey操作只需要找到相应的Segment实例,通过Segment实例找到节点链,然后遍历节点链即可,不细说。
对put、putIfAbsent、replace、remove、clear操作,它们在Segment中都实现,只需要通过hash值找到Segment实例,然后调用相应方法即可。

ConcurrentHashMap中的size、containsValue、contains、isEmpty操作
因为这些操作需要全局扫瞄整个Map,正常情况下需要先获得所有Segment实例的锁,然后做相应的查找、计算得到结果,再解锁,返回值。然而为了竟可能的减少锁对性能的影响,Doug Lea在这里并没有直接加锁,而是先尝试的遍历查找、计算2遍,如果两遍遍历过程中整个Map没有发生修改(即两次所有Segment实例中modCount值的和一致),则可以认为整个查找、计算过程中Map没有发生改变,我们计算的结果是正确的,否则,在顺序的在所有Segment实例加锁,计算,解锁,然后返回。以containsValue为例:

 转载自

并发编程网 – ifeve.com本文链接地址: 深入剖析ConcurrentHashMap(2)本文链接地址: 聊聊并发(四)深入分析ConcurrentHashMap

http://www.blogjava.net/DLevin/archive/2013/10/18/405030.htmlJava
Core系列之ConcurrentHashMap实现(JDK 1.7)

抱歉!评论已关闭.