[关闭]
@adamhand 2018-12-25T14:04:49.000000Z 字数 31891 阅读 677

ConcurrentHashMap--从jdk1.7到jdk1.8


jdk1.7

1. 设计思路

  HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。

  • 优点:相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。
  • 缺点:由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的)。因为size()函数统计的时候,可能刚统计完一个segment,这个segment中的元素马上就被修改了,最终统计出来的结果可能是旧值,所以是弱一致性的。

  ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
  ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性,代码如下:

  1. static final class HashEntry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V value;
  5. volatile HashEntry<K,V> next;
  6. ...
  7. }

2. ConcurrentHashMap的初始化

  ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组。

  • initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
  • loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。
  • concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16(具体参见2.1的并发度概念)。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

  ConcurrentHashMap的示意图如下:

2.1 初始化segments数组

  首先介绍一个并发度的概念。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。初始化ConcurrentHashMap的源代码如下。

  1. public ConcurrentHashMap(int initialCapacity,
  2. float loadFactor, int concurrencyLevel) {
  3. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  4. throw new IllegalArgumentException();
  5. if (concurrencyLevel > MAX_SEGMENTS)
  6. concurrencyLevel = MAX_SEGMENTS;
  7. // Find power-of-two sizes best matching arguments
  8. int sshift = 0;
  9. int ssize = 1;
  10. // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
  11. while (ssize < concurrencyLevel) {
  12. ++sshift;
  13. ssize <<= 1;
  14. }
  15. // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
  16. // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
  17. this.segmentShift = 32 - sshift;
  18. this.segmentMask = ssize - 1;
  19. if (initialCapacity > MAXIMUM_CAPACITY)
  20. initialCapacity = MAXIMUM_CAPACITY;
  21. // initialCapacity 是设置整个 map 初始的大小,
  22. // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
  23. // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
  24. int c = initialCapacity / ssize;
  25. if (c * ssize < initialCapacity)
  26. ++c;
  27. // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
  28. // 插入一个元素不至于扩容,插入第二个的时候才会扩容
  29. int cap = MIN_SEGMENT_TABLE_CAPACITY;
  30. while (cap < c)
  31. cap <<= 1;
  32. // 创建 Segment 数组,
  33. // 并创建数组的第一个元素 segment[0]
  34. Segment<K,V> s0 =
  35. new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
  36. (HashEntry<K,V>[])new HashEntry[cap]);
  37. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  38. // 往数组写入 segment[0]
  39. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  40. this.segments = ss;
  41. }

  由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。

2.2 初始化segmentShift和segmentMask

  运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。segmentShift与segmentMask都是在构造过程中根据concurrency level被相应的计算出来。sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。
  如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

2.3 初始化每个Segment

  和JDK6不同,JDK7中除了第一个Segment之外,剩余的Segments采用的是延迟初始化的机制:每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。
  ensureSegment可能在并发环境下被调用,但与想象中不同,ensureSegment并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下:

  1. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  2. == null) { // recheck
  3. Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  4. while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  5. == null) {
  6. if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  7. break;
  8. }
  9. }

3. put/putIfAbsent/putAll

put过程大概可以总结为如下步骤:

  既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。

  1. private static int hash(int h) {
  2. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10);
  3. h += (h << 3); h ^= (h >>> 6);
  4. h += (h << 2) + (h << 14); return h ^ (h >>> 16);
  5. }
  6. public V put(K key, V value) {
  7. Segment<K,V> s;
  8. if (value == null)
  9. throw new NullPointerException();
  10. // 1. 计算 key 的 hash 值
  11. int hash = hash(key);
  12. // 2. 根据 hash 值找到 Segment 数组中的位置 j
  13. // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
  14. // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
  15. int j = (hash >>> segmentShift) & segmentMask;
  16. // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
  17. // ensureSegment(j) 对 segment[j] 进行初始化
  18. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  19. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  20. s = ensureSegment(j);
  21. // 3. 插入新值到 槽 s 中
  22. return s.put(key, hash, value, false);
  23. }

  再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。
  找到相应的 Segment之后就是 Segment 内部的 put 操作了。

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. // 在往该 segment 写入前,需要先获取该 segment 的独占锁
  3. // 先看主流程,后面还会具体介绍这部分内容
  4. HashEntry<K,V> node = tryLock() ? null :
  5. scanAndLockForPut(key, hash, value);
  6. V oldValue;
  7. try {
  8. // 这个是 segment 内部的数组
  9. HashEntry<K,V>[] tab = table;
  10. // 再利用 hash 值,求应该放置的数组下标
  11. int index = (tab.length - 1) & hash;
  12. // first 是数组该位置处的链表的表头
  13. HashEntry<K,V> first = entryAt(tab, index);
  14. // 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
  15. for (HashEntry<K,V> e = first;;) {
  16. if (e != null) {
  17. K k;
  18. if ((k = e.key) == key ||
  19. (e.hash == hash && key.equals(k))) {
  20. oldValue = e.value;
  21. if (!onlyIfAbsent) {
  22. // 覆盖旧值
  23. e.value = value;
  24. ++modCount;
  25. }
  26. break;
  27. }
  28. // 继续顺着链表走
  29. e = e.next;
  30. }
  31. else {
  32. // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
  33. // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
  34. if (node != null)
  35. node.setNext(first);
  36. else
  37. node = new HashEntry<K,V>(hash, key, value, first);
  38. int c = count + 1;
  39. // 如果超过了该 segment 的阈值,这个 segment 需要扩容
  40. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  41. rehash(node); // 扩容后面也会具体分析
  42. else
  43. // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
  44. // 其实就是将新的节点设置成原链表的表头
  45. setEntryAt(tab, index, node);
  46. ++modCount;
  47. count = c;
  48. oldValue = null;
  49. break;
  50. }
  51. }
  52. } finally {
  53. // 解锁
  54. unlock();
  55. }
  56. return oldValue;
  57. }

  到这里 put 操作就结束了,接下来,我们说一说其中几步关键的操作。

初始化槽: ensureSegment

  ConcurrentHashMap 初始化的时候会初始化第一个槽segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
  这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽segment[k],不过只要有一个成功了就可以。

  1. private Segment<K,V> ensureSegment(int k) {
  2. final Segment<K,V>[] ss = this.segments;
  3. long u = (k << SSHIFT) + SBASE; // raw offset
  4. Segment<K,V> seg;
  5. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  6. // 这里看到为什么之前要初始化 segment[0] 了,
  7. // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
  8. // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
  9. Segment<K,V> proto = ss[0];
  10. int cap = proto.table.length;
  11. float lf = proto.loadFactor;
  12. int threshold = (int)(cap * lf);
  13. // 初始化 segment[k] 内部的数组
  14. HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  15. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  16. == null) { // 再次检查一遍该槽是否被其他线程初始化了。
  17. Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  18. // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
  19. while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  20. == null) {
  21. if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  22. break;
  23. }
  24. }
  25. }
  26. return seg;
  27. }

  总的来说,ensureSegment(int k) 比较简单,对于并发操作使用 CAS 进行控制。

我没搞懂这里为什么要搞一个 while 循环,CAS 失败不就代表有其他线程成功了吗,为什么要再进行判断?

获取写入锁: scanAndLockForPut

  前面我们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。
  下面我们来具体分析这个方法中是怎么控制加锁的。

  1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  2. HashEntry<K,V> first = entryForHash(this, hash);
  3. HashEntry<K,V> e = first;
  4. HashEntry<K,V> node = null;
  5. int retries = -1; // negative while locating node
  6. // 循环获取锁
  7. while (!tryLock()) {
  8. HashEntry<K,V> f; // to recheck first below
  9. if (retries < 0) {
  10. if (e == null) {
  11. if (node == null) // speculatively create node
  12. // 进到这里说明数组该位置的链表是空的,没有任何元素
  13. // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
  14. node = new HashEntry<K,V>(hash, key, value, null);
  15. retries = 0;
  16. }
  17. else if (key.equals(e.key))
  18. retries = 0;
  19. else
  20. // 顺着链表往下走
  21. e = e.next;
  22. }
  23. // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
  24. // lock() 是阻塞方法,直到获取锁后返回
  25. else if (++retries > MAX_SCAN_RETRIES) {
  26. lock();
  27. break;
  28. }
  29. else if ((retries & 1) == 0 &&
  30. // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
  31. // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
  32. (f = entryForHash(this, hash)) != first) {
  33. e = first = f; // re-traverse if entry changed
  34. retries = -1;
  35. }
  36. }
  37. return node;
  38. }

  这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
  这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。

扩容: rehash

  相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。
  该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。

  1. // 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
  2. private void rehash(HashEntry<K,V> node) {
  3. HashEntry<K,V>[] oldTable = table;
  4. int oldCapacity = oldTable.length;
  5. // 2 倍
  6. int newCapacity = oldCapacity << 1;
  7. threshold = (int)(newCapacity * loadFactor);
  8. // 创建新数组
  9. HashEntry<K,V>[] newTable =
  10. (HashEntry<K,V>[]) new HashEntry[newCapacity];
  11. // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
  12. int sizeMask = newCapacity - 1;
  13. // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
  14. for (int i = 0; i < oldCapacity ; i++) {
  15. // e 是链表的第一个元素
  16. HashEntry<K,V> e = oldTable[i];
  17. if (e != null) {
  18. HashEntry<K,V> next = e.next;
  19. // 计算应该放置在新数组中的位置,
  20. // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
  21. int idx = e.hash & sizeMask;
  22. if (next == null) // 该位置处只有一个元素,那比较好办
  23. newTable[idx] = e;
  24. else { // Reuse consecutive sequence at same slot
  25. // e 是链表表头
  26. HashEntry<K,V> lastRun = e;
  27. // idx 是当前链表的头结点 e 的新位置
  28. int lastIdx = idx;
  29. // 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
  30. for (HashEntry<K,V> last = next;
  31. last != null;
  32. last = last.next) {
  33. int k = last.hash & sizeMask;
  34. if (k != lastIdx) {
  35. lastIdx = k;
  36. lastRun = last;
  37. }
  38. }
  39. // 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
  40. newTable[lastIdx] = lastRun;
  41. // 下面的操作是处理 lastRun 之前的节点,
  42. // 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
  43. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
  44. V v = p.value;
  45. int h = p.hash;
  46. int k = h & sizeMask;
  47. HashEntry<K,V> n = newTable[k];
  48. newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
  49. }
  50. }
  51. }
  52. }
  53. // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
  54. int nodeIndex = node.hash & sizeMask; // add the new node
  55. node.setNext(newTable[nodeIndex]);
  56. newTable[nodeIndex] = node;
  57. table = newTable;
  58. }

  上述put操作可以总结为几个比较重要的点:

  • 和JDK6一样,ConcurrentHashMap的put方法被代理到了对应的Segment(定位Segment的原理之前已经描述过)中。与JDK6不同的是,JDK7版本的ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化 - 在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。
  • 需要注意的是,由于在并发环境下,其他线程的put,rehash或者remove操作可能会导致链表头结点的变化,因此在过程中需要进行检查,如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除,即使该变化因为是非原子写操作(删除节点后链接后续节点调用的是Unsafe.putOrderedObject(),该方法不提供原子写语义)可能导致当前线程无法观察到,但因为不影响遍历的正确性所以忽略不计。
  • 之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。
  • 在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。
  • put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过Unsafe的putOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。

4. get

  get方法需要以下三个步骤:

  • 计算 hash 值,找到 segment 数组中的具体位置,或我们前面用的“槽”
  • 槽中也是一个数组,根据 hash 找到数组中具体的位置
  • 到这里是链表了,顺着链表进行查找即可

  get与containsKey两个方法几乎完全一致:他们都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。

  1. public V get(Object key) {
  2. Segment<K,V> s; // manually integrate access methods to reduce overhead
  3. HashEntry<K,V>[] tab;
  4. // 1. hash 值
  5. int h = hash(key);
  6. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  7. // 2. 根据 hash 找到对应的 segment
  8. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
  9. (tab = s.table) != null) {
  10. // 3. 找到segment 内部数组相应位置的链表,遍历
  11. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  12. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  13. e != null; e = e.next) {
  14. K k;
  15. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
  16. return e.value;
  17. }
  18. }
  19. return null;
  20. }

5. remover

  和put类似,remove在真正获得锁之前,也会对链表进行遍历以提高缓存命中率。

6. size、containsValue

  这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment(通过Unsafe的getObjectVolatile()以保证原子读语义),获得对应的值以及所有Segment的modcount之和。如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。
  当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的Segment,否则容易出现其他线程创建Segment并进行put,remove等操作。代码如下:

  1. for(int j =0; j < segments.length; ++j)
  2. ensureSegment(j).lock();// force creation

  一般来说,应该避免在多线程环境下使用size和containsValue方法。

  • 注1:modcount在put, replace, remove以及clear等方法中都会被修改。
  • 注2:对于containsValue方法来说,如果在循环过程中发现匹配value的HashEntry,则直接返回true。

  最后,与HashMap不同的是,ConcurrentHashMap并不允许key或者value为null,按照Doug Lea的说法,这么设计的原因是在ConcurrentHashMap中,一旦value出现null,则代表HashEntry的key/value没有映射完成就被其他线程所见,需要特殊处理。在JDK6中,get方法的实现中就有一段对HashEntry.value == null的防御性判断。但Doug Lea也承认实际运行过程中,这种情况似乎不可能发生(参考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。

参考资料

  1. http://www.importnew.com/28263.html
  2. http://www.importnew.com/22007.html
  3. http://ifeve.com/concurrenthashmap/
  4. https://blog.csdn.net/seapeak007/article/details/53409618

jdk1.8

  ConcurrentHashMap在JDK8中进行了巨大改动,它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想(JDK7与JDK8中HashMap的实现),但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。
  下面是结构示意图:

1. 重要的属性

  首先来看几个重要的属性,与HashMap相同的就不再介绍了,这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。

  • 负数代表正在进行初始化或扩容操作
    • -1代表正在初始化
    • -N 表示有N-1个线程正在进行扩容操作
  • 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
  1. /**
  2. * 盛装Node元素的数组 它的大小是2的整数次幂
  3. * Size is always a power of two. Accessed directly by iterators.
  4. */
  5. transient volatile Node<K,V>[] table;
  6. /**
  7. * Table initialization and resizing control. When negative, the
  8. * table is being initialized or resized: -1 for initialization,
  9. * else -(1 + the number of active resizing threads). Otherwise,
  10. * when table is null, holds the initial table size to use upon
  11. * creation, or 0 for default. After initialization, holds the
  12. * next element count value upon which to resize the table.
  13. hash表初始化或扩容时的一个控制位标识量。
  14. 负数代表正在进行初始化或扩容操作
  15. -1代表正在初始化
  16. -N 表示有N-1个线程正在进行扩容操作
  17. 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
  18. */
  19. private transient volatile int sizeCtl;
  20. // 以下两个是用来控制扩容的时候 单线程进入的变量
  21. /**
  22. * The number of bits used for generation stamp in sizeCtl.
  23. * Must be at least 6 for 32bit arrays.
  24. */
  25. private static int RESIZE_STAMP_BITS = 16;
  26. /**
  27. * The bit shift for recording size stamp in sizeCtl.
  28. */
  29. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  30. /*
  31. * Encodings for Node hash fields. See above for explanation.
  32. */
  33. static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点
  34. static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点

2. 重要的类

2.1 Node

  Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁(与JDK7的Segment相同),它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。

2.2 TreeNode

  树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

2.3 TreeBin

  这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

2.4 ForwardingNode

  一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1。 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。

  1. /**
  2. * A node inserted at head of bins during transfer operations.
  3. */
  4. static final class ForwardingNode<K,V> extends Node<K,V> {
  5. final Node<K,V>[] nextTable;
  6. ForwardingNode(Node<K,V>[] tab) {
  7. super(MOVED, null, null, null);
  8. this.nextTable = tab;
  9. }
  10. Node<K,V> find(int h, Object k) {
  11. // loop to avoid arbitrarily deep recursion on forwarding nodes
  12. outer: for (Node<K,V>[] tab = nextTable;;) {
  13. Node<K,V> e; int n;
  14. if (k == null || tab == null || (n = tab.length) == 0 ||
  15. (e = tabAt(tab, (n - 1) & h)) == null)
  16. return null;
  17. for (;;) {
  18. int eh; K ek;
  19. if ((eh = e.hash) == h &&
  20. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  21. return e;
  22. if (eh < 0) {
  23. if (e instanceof ForwardingNode) {
  24. tab = ((ForwardingNode<K,V>)e).nextTable;
  25. continue outer;
  26. }
  27. else
  28. return e.find(h, k);
  29. }
  30. if ((e = e.next) == null)
  31. return null;
  32. }
  33. }
  34. }
  35. }

3. Unsafe与CAS

  在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。

3.1 unsafe静态块

  unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。利用CAS进行无锁操作,可以大大提高性能。

  1. private static final sun.misc.Unsafe U;
  2. private static final long SIZECTL;
  3. private static final long TRANSFERINDEX;
  4. private static final long BASECOUNT;
  5. private static final long CELLSBUSY;
  6. private static final long CELLVALUE;
  7. private static final long ABASE;
  8. private static final int ASHIFT;
  9. static {
  10. try {
  11. U = sun.misc.Unsafe.getUnsafe();
  12. Class<?> k = ConcurrentHashMap.class;
  13. SIZECTL = U.objectFieldOffset
  14. (k.getDeclaredField("sizeCtl"));
  15. TRANSFERINDEX = U.objectFieldOffset
  16. (k.getDeclaredField("transferIndex"));
  17. BASECOUNT = U.objectFieldOffset
  18. (k.getDeclaredField("baseCount"));
  19. CELLSBUSY = U.objectFieldOffset
  20. (k.getDeclaredField("cellsBusy"));
  21. Class<?> ck = CounterCell.class;
  22. CELLVALUE = U.objectFieldOffset
  23. (ck.getDeclaredField("value"));
  24. Class<?> ak = Node[].class;
  25. ABASE = U.arrayBaseOffset(ak);
  26. int scale = U.arrayIndexScale(ak);
  27. if ((scale & (scale - 1)) != 0)
  28. throw new Error("data type scale not a power of two");
  29. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  30. } catch (Exception e) {
  31. throw new Error(e);
  32. }
  33. }

3.2 三个核心方法

  ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

  1. //获得在i位置上的Node节点
  2. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  3. return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  4. }
  5. //利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
  6. //在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
  7. //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN
  8. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
  9. Node<K,V> c, Node<K,V> v) {
  10. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  11. }
  12. //利用volatile方法设置节点位置的值
  13. static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
  14. U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
  15. }

4. 初始化方法initTable

  对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
  初始化方法主要应用了关键属性sizeCtl 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。

  1. /**
  2. * Initializes table, using the size recorded in sizeCtl.
  3. */
  4. private final Node<K,V>[] initTable() {
  5. Node<K,V>[] tab; int sc;
  6. while ((tab = table) == null || tab.length == 0) {
  7. //sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
  8. if ((sc = sizeCtl) < 0)
  9. Thread.yield(); // lost initialization race; just spin
  10. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
  11. try {
  12. if ((tab = table) == null || tab.length == 0) {
  13. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  14. @SuppressWarnings("unchecked")
  15. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  16. table = tab = nt;
  17. sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值
  18. }
  19. } finally {
  20. sizeCtl = sc;
  21. }
  22. break;
  23. }
  24. }
  25. return tab;
  26. }

5. 扩容:tryPresize

  这里的扩容也是做翻倍扩容的,扩容后数组容量为原来的 2 倍。

  1. // 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
  2. private final void tryPresize(int size) {
  3. // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
  4. int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
  5. tableSizeFor(size + (size >>> 1) + 1);
  6. int sc;
  7. while ((sc = sizeCtl) >= 0) {
  8. Node<K,V>[] tab = table; int n;
  9. // 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
  10. if (tab == null || (n = tab.length) == 0) {
  11. n = (sc > c) ? sc : c;
  12. if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  13. try {
  14. if (table == tab) {
  15. @SuppressWarnings("unchecked")
  16. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  17. table = nt;
  18. sc = n - (n >>> 2); // 0.75 * n
  19. }
  20. } finally {
  21. sizeCtl = sc;
  22. }
  23. }
  24. }
  25. else if (c <= sc || n >= MAXIMUM_CAPACITY)
  26. break;
  27. else if (tab == table) {
  28. // 我没看懂 rs 的真正含义是什么,不过也关系不大
  29. int rs = resizeStamp(n);
  30. if (sc < 0) {
  31. Node<K,V>[] nt;
  32. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  33. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  34. transferIndex <= 0)
  35. break;
  36. // 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
  37. // 此时 nextTab 不为 null
  38. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  39. transfer(tab, nt);
  40. }
  41. // 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
  42. // 我是没看懂这个值真正的意义是什么?不过可以计算出来的是,结果是一个比较大的负数
  43. // 调用 transfer 方法,此时 nextTab 参数为 null
  44. else if (U.compareAndSwapInt(this, SIZECTL, sc,
  45. (rs << RESIZE_STAMP_SHIFT) + 2))
  46. transfer(tab, null);
  47. }
  48. }
  49. }

  这个方法的核心在于 sizeCtl 值的操作,首先将其设置为一个负数,然后执行 transfer(tab, null),再下一个循环将 sizeCtl 加 1,并执行 transfer(tab, nt),之后可能是继续 sizeCtl 加 1,并执行 transfer(tab, nt)。
  所以,可能的操作就是执行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),这里怎么结束循环的需要看完 transfer 源码才清楚。

5.1 数据迁移:transfer

  transfer将原来的 tab 数组的元素迁移到新的 nextTab 数组中。
  整个操作分为两个部分

  • 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到;
  • 第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

  先来看一下单线程是如何完成的:它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

  • 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
  • 如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
  • 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
  • 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

  再看一下多线程是如何完成的:
  在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。

  1. /**
  2. * 一个过渡的table表 只有在扩容的时候才会使用
  3. */
  4. private transient volatile Node<K,V>[] nextTable;
  5. /**
  6. * Moves and/or copies the nodes in each bin to new table. See
  7. * above for explanation.
  8. */
  9. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  10. int n = tab.length, stride;
  11. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  12. stride = MIN_TRANSFER_STRIDE; // subdivide range
  13. if (nextTab == null) { // initiating
  14. try {
  15. @SuppressWarnings("unchecked")
  16. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
  17. nextTab = nt;
  18. } catch (Throwable ex) { // try to cope with OOME
  19. sizeCtl = Integer.MAX_VALUE;
  20. return;
  21. }
  22. nextTable = nextTab;
  23. transferIndex = n;
  24. }
  25. int nextn = nextTab.length;
  26. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
  27. boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
  28. boolean finishing = false; // to ensure sweep before committing nextTab
  29. for (int i = 0, bound = 0;;) {
  30. Node<K,V> f; int fh;
  31. //这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点
  32. while (advance) {
  33. int nextIndex, nextBound;
  34. if (--i >= bound || finishing)
  35. advance = false;
  36. else if ((nextIndex = transferIndex) <= 0) {
  37. i = -1;
  38. advance = false;
  39. }
  40. else if (U.compareAndSwapInt
  41. (this, TRANSFERINDEX, nextIndex,
  42. nextBound = (nextIndex > stride ?
  43. nextIndex - stride : 0))) {
  44. bound = nextBound;
  45. i = nextIndex - 1;
  46. advance = false;
  47. }
  48. }
  49. if (i < 0 || i >= n || i + n >= nextn) {
  50. int sc;
  51. if (finishing) {
  52. //如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
  53. nextTable = null;
  54. table = nextTab;
  55. sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
  56. return;
  57. }
  58. //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
  59. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  60. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  61. return;
  62. finishing = advance = true;
  63. i = n; // recheck before commit
  64. }
  65. }
  66. //如果遍历到的节点为空 则放入ForwardingNode指针
  67. else if ((f = tabAt(tab, i)) == null)
  68. advance = casTabAt(tab, i, null, fwd);
  69. //如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
  70. else if ((fh = f.hash) == MOVED)
  71. advance = true; // already processed
  72. else {
  73. //节点上锁
  74. synchronized (f) {
  75. if (tabAt(tab, i) == f) {
  76. Node<K,V> ln, hn;
  77. //如果fh>=0 证明这是一个Node节点
  78. if (fh >= 0) {
  79. int runBit = fh & n;
  80. //以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
  81. Node<K,V> lastRun = f;
  82. for (Node<K,V> p = f.next; p != null; p = p.next) {
  83. int b = p.hash & n;
  84. if (b != runBit) {
  85. runBit = b;
  86. lastRun = p;
  87. }
  88. }
  89. if (runBit == 0) {
  90. ln = lastRun;
  91. hn = null;
  92. }
  93. else {
  94. hn = lastRun;
  95. ln = null;
  96. }
  97. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  98. int ph = p.hash; K pk = p.key; V pv = p.val;
  99. if ((ph & n) == 0)
  100. ln = new Node<K,V>(ph, pk, pv, ln);
  101. else
  102. hn = new Node<K,V>(ph, pk, pv, hn);
  103. }
  104. //在nextTable的i位置上插入一个链表
  105. setTabAt(nextTab, i, ln);
  106. //在nextTable的i+n的位置上插入另一个链表
  107. setTabAt(nextTab, i + n, hn);
  108. //在table的i位置上插入forwardNode节点 表示已经处理过该节点
  109. setTabAt(tab, i, fwd);
  110. //设置advance为true 返回到上面的while循环中 就可以执行i--操作
  111. advance = true;
  112. }
  113. //对TreeBin对象进行处理 与上面的过程类似
  114. else if (f instanceof TreeBin) {
  115. TreeBin<K,V> t = (TreeBin<K,V>)f;
  116. TreeNode<K,V> lo = null, loTail = null;
  117. TreeNode<K,V> hi = null, hiTail = null;
  118. int lc = 0, hc = 0;
  119. //构造正序和反序两个链表
  120. for (Node<K,V> e = t.first; e != null; e = e.next) {
  121. int h = e.hash;
  122. TreeNode<K,V> p = new TreeNode<K,V>
  123. (h, e.key, e.val, null, null);
  124. if ((h & n) == 0) {
  125. if ((p.prev = loTail) == null)
  126. lo = p;
  127. else
  128. loTail.next = p;
  129. loTail = p;
  130. ++lc;
  131. }
  132. else {
  133. if ((p.prev = hiTail) == null)
  134. hi = p;
  135. else
  136. hiTail.next = p;
  137. hiTail = p;
  138. ++hc;
  139. }
  140. }
  141. //如果扩容后已经不再需要tree的结构 反向转换为链表结构
  142. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  143. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  144. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  145. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  146. //在nextTable的i位置上插入一个链表
  147. setTabAt(nextTab, i, ln);
  148. //在nextTable的i+n的位置上插入另一个链表
  149. setTabAt(nextTab, i + n, hn);
  150. //在table的i位置上插入forwardNode节点 表示已经处理过该节点
  151. setTabAt(tab, i, fwd);
  152. //设置advance为true 返回到上面的while循环中 就可以执行i--操作
  153. advance = true;
  154. }
  155. }
  156. }
  157. }
  158. }
  159. }

6. put

  前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是put和get两个方法。现在来介绍put方法,这个put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两个情况

  • 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
  • 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

  整体流程就是首先定义不允许key或value为null的情况放入 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。
  如果这个位置是空的,那么直接放入,而且不需要加锁操作。
  如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. //不允许 key或value为null
  7. if (key == null || value == null) throw new NullPointerException();
  8. //计算hash值
  9. int hash = spread(key.hashCode());
  10. int binCount = 0;
  11. //死循环 何时插入成功 何时跳出
  12. for (Node<K,V>[] tab = table;;) {
  13. Node<K,V> f; int n, i, fh;
  14. //如果table为空的话,初始化table
  15. if (tab == null || (n = tab.length) == 0)
  16. tab = initTable();
  17. //根据hash值计算出在table里面的位置
  18. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  19. //如果这个位置没有值 ,直接放进去,不需要加锁
  20. if (casTabAt(tab, i, null,
  21. new Node<K,V>(hash, key, value, null)))
  22. break; // no lock when adding to empty bin
  23. }
  24. //当遇到表连接点时,需要进行整合表的操作
  25. else if ((fh = f.hash) == MOVED)
  26. tab = helpTransfer(tab, f);
  27. else {
  28. V oldVal = null;
  29. //结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
  30. synchronized (f) {
  31. if (tabAt(tab, i) == f) {
  32. //fh〉0 说明这个节点是一个链表的节点 不是树的节点
  33. if (fh >= 0) {
  34. binCount = 1;
  35. //在这里遍历链表所有的结点
  36. for (Node<K,V> e = f;; ++binCount) {
  37. K ek;
  38. //如果hash值和key值相同 则修改对应结点的value值
  39. if (e.hash == hash &&
  40. ((ek = e.key) == key ||
  41. (ek != null && key.equals(ek)))) {
  42. oldVal = e.val;
  43. if (!onlyIfAbsent)
  44. e.val = value;
  45. break;
  46. }
  47. Node<K,V> pred = e;
  48. //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
  49. if ((e = e.next) == null) {
  50. pred.next = new Node<K,V>(hash, key,
  51. value, null);
  52. break;
  53. }
  54. }
  55. }
  56. //如果这个节点是树节点,就按照树的方式插入值
  57. else if (f instanceof TreeBin) {
  58. Node<K,V> p;
  59. binCount = 2;
  60. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  61. value)) != null) {
  62. oldVal = p.val;
  63. if (!onlyIfAbsent)
  64. p.val = value;
  65. }
  66. }
  67. }
  68. }
  69. if (binCount != 0) {
  70. //如果链表长度已经达到临界值8 就需要把链表转换为树结构
  71. if (binCount >= TREEIFY_THRESHOLD)
  72. treeifyBin(tab, i);
  73. if (oldVal != null)
  74. return oldVal;
  75. break;
  76. }
  77. }
  78. }
  79. //将当前ConcurrentHashMap的元素数量+1
  80. addCount(1L, binCount);
  81. return null;
  82. }

  我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的并且也是线程安全的,建立在之前提到的3个原子操作上。

6.1 helpTransfer方法

  这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. Node<K,V>[] nextTab; int sc;
  3. if (tab != null && (f instanceof ForwardingNode) &&
  4. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  5. int rs = resizeStamp(tab.length);
  6. while (nextTab == nextTable && table == tab &&
  7. (sc = sizeCtl) < 0) {
  8. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  9. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  10. break;
  11. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  12. transfer(tab, nextTab);
  13. break;
  14. }
  15. }
  16. return nextTab;
  17. }
  18. return table;
  19. }

6.2 treeifyBin方法

  这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。

  1. private final void treeifyBin(Node<K,V>[] tab, int index) {
  2. Node<K,V> b; int n, sc;
  3. if (tab != null) {
  4. // MIN_TREEIFY_CAPACITY 为 64
  5. // 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
  6. if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
  7. // 后面我们再详细分析这个方法
  8. tryPresize(n << 1);
  9. // b 是头结点
  10. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  11. // 加锁
  12. synchronized (b) {
  13. if (tabAt(tab, index) == b) {
  14. // 下面就是遍历链表,建立一颗红黑树
  15. TreeNode<K,V> hd = null, tl = null;
  16. for (Node<K,V> e = b; e != null; e = e.next) {
  17. TreeNode<K,V> p =
  18. new TreeNode<K,V>(e.hash, e.key, e.val,
  19. null, null);
  20. if ((p.prev = tl) == null)
  21. hd = p;
  22. else
  23. tl.next = p;
  24. tl = p;
  25. }
  26. // 将红黑树设置到数组相应位置中
  27. setTabAt(tab, index, new TreeBin<K,V>(hd));
  28. }
  29. }
  30. }
  31. }
  32. }

7. get方法

  get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. //计算hash值
  4. int h = spread(key.hashCode());
  5. //根据hash值确定节点位置
  6. if ((tab = table) != null && (n = tab.length) > 0 &&
  7. (e = tabAt(tab, (n - 1) & h)) != null) {
  8. //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
  9. if ((eh = e.hash) == h) {
  10. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  11. return e.val;
  12. }
  13. //如果eh<0 说明这个节点在树上 直接寻找
  14. else if (eh < 0)
  15. return (p = e.find(h, key)) != null ? p.val : null;
  16. //否则遍历链表 找到对应的值并返回
  17. while ((e = e.next) != null) {
  18. if (e.hash == h &&
  19. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  20. return e.val;
  21. }
  22. }
  23. return null;
  24. }

8. size相关方法

  对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。

8.1 辅助定义

  为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类

  1. /**
  2. * A padded cell for distributing counts. Adapted from LongAdder
  3. * and Striped64. See their internal docs for explanation.
  4. */
  5. @sun.misc.Contended static final class CounterCell {
  6. volatile long value;
  7. CounterCell(long x) { value = x; }
  8. }
  9. /******************************************/
  10. /**
  11. * 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
  12. 但它并不用返回当前hashmap的元素个数
  13. */
  14. private transient volatile long baseCount;
  15. /**
  16. * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
  17. */
  18. private transient volatile int cellsBusy;
  19. /**
  20. * Table of counter cells. When non-null, size is a power of 2.
  21. */
  22. private transient volatile CounterCell[] counterCells;

8.2 mappingCount与Size方法

  mappingCount与size方法的类似 从Java工程师给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。

  1. public int size() {
  2. long n = sumCount();
  3. return ((n < 0L) ? 0 :
  4. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
  5. (int)n);
  6. }
  7. /**
  8. * Returns the number of mappings. This method should be used
  9. * instead of {@link #size} because a ConcurrentHashMap may
  10. * contain more mappings than can be represented as an int. The
  11. * value returned is an estimate; the actual count may differ if
  12. * there are concurrent insertions or removals.
  13. *
  14. * @return the number of mappings
  15. * @since 1.8
  16. */
  17. public long mappingCount() {
  18. long n = sumCount();
  19. return (n < 0L) ? 0L : n; // ignore transient negative values
  20. }
  21. final long sumCount() {
  22. CounterCell[] as = counterCells; CounterCell a;
  23. long sum = baseCount;
  24. if (as != null) {
  25. for (int i = 0; i < as.length; ++i) {
  26. if ((a = as[i]) != null)
  27. sum += a.value;//所有counter的值求和
  28. }
  29. }
  30. return sum;
  31. }

8.3 addCount方法

  在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。

  1. private final void addCount(long x, int check) {
  2. CounterCell[] as; long b, s;
  3. //利用CAS方法更新baseCount的值
  4. if ((as = counterCells) != null ||
  5. !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  6. CounterCell a; long v; int m;
  7. boolean uncontended = true;
  8. if (as == null || (m = as.length - 1) < 0 ||
  9. (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
  10. !(uncontended =
  11. U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
  12. fullAddCount(x, uncontended);
  13. return;
  14. }
  15. if (check <= 1)
  16. return;
  17. s = sumCount();
  18. }
  19. //如果check值大于等于0 则需要检验是否需要进行扩容操作
  20. if (check >= 0) {
  21. Node<K,V>[] tab, nt; int n, sc;
  22. while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
  23. (n = tab.length) < MAXIMUM_CAPACITY) {
  24. int rs = resizeStamp(n);
  25. //
  26. if (sc < 0) {
  27. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  28. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  29. transferIndex <= 0)
  30. break;
  31. //如果已经有其他线程在执行扩容操作
  32. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  33. transfer(tab, nt);
  34. }
  35. //当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
  36. else if (U.compareAndSwapInt(this, SIZECTL, sc,
  37. (rs << RESIZE_STAMP_SHIFT) + 2))
  38. transfer(tab, null);
  39. s = sumCount();
  40. }
  41. }
  42. }

9. 总结

  JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。
  jdk7中ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以jdk8 中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。
  主要设计上的变化有以下几点:

  • 不采用segment而采用node,锁住node来实现减小锁粒度。
  • 设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。
  • 使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。
  • sizeCtl的不同值来代表不同含义,起到了控制的作用。

  至于为什么JDK8中使用synchronized而不是ReentrantLock,我猜是因为JDK8中对synchronized有了足够的优化吧。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注