原文地址:https://my.oschina.net/hosee/blog/675884
并发编程实践中,ConcurrentHashMap 是一个经常被使用的数据结构,相比于 Hashtable 以及 Collections.synchronizedMap(),ConcurrentHashMap 在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点好像 CAP 理论啊 O(∩_∩)O)。ConcurrentHashMap 的设计与实现非常精巧,大量的利用了 volatile,final,CAS 等 lock-free 技术来减少锁竞争对于性能的影响,无论对于 Java 并发编程的学习还是 Java 内存模型的理解,ConcurrentHashMap 的设计以及源码都值得非常仔细的阅读与揣摩。
这篇日志记录了自己对 ConcurrentHashMap 的一些总结,由于 JDK6,7,8 中实现都不同,需要分开阐述在不同版本中的 ConcurrentHashMap。
之前已经在 ConcurrentHashMap 原理分析中解释了 ConcurrentHashMap 的原理,主要是从代码的角度来阐述是源码是如何写的,本文仍然从源码出发,挑选个人觉得重要的点(会用红色标注)再次进行回顾,以及阐述 ConcurrentHashMap 的一些注意点。
1. JDK6 与 JDK7 中的实现
1.1 设计思路
ConcurrentHashMap 采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个 Map 加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个 Map 加锁,导致一些需要扫描整个 Map 的方法(如 size(), containsValue())需要使用特殊的实现,另外一些方法(如 clear())甚至放弃了对一致性的要求(ConcurrentHashMap 是弱一致性的,具体请查看 ConcurrentHashMap 能完全替代 HashTable 吗?)。
ConcurrentHashMap 中的分段锁称为 Segment,它即类似于 HashMap(JDK7 与 JDK8 中 HashMap 的实现)的结构,即内部拥有一个 Entry 数组,数组中的每个元素又是一个链表;同时又是一个 ReentrantLock(Segment 继承了 ReentrantLock)。ConcurrentHashMap 中的 HashEntry 相对于 HashMap 中的 Entry 有一定的差异性:HashEntry 中的 value 以及 next 都被 volatile 修饰,这样在多线程读写过程中能够保持它们的可见性,代码如下:
static final class HashEntry {
final int hash;
final K key;
volatile V value;
volatile HashEntry next;
1.2 并发度(Concurrency Level)
并发度可以理解为程序运行时能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数,实际上就是 ConcurrentHashMap 中的分段锁个数,即 Segment[]的数组长度。ConcurrentHashMap 默认的并发度为 16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap 会使用大于等于该值的最小 2 幂指数作为实际并发度(假如用户设置并发度为 17,实际并发度则为 32)。运行时通过将 key 的高 n 位(n = 32 – segmentShift)和并发度减 1(segmentMask)做位与运算定位到所在的 Segment。segmentShift 与 segmentMask 都是在构造过程中根据 concurrency level 被相应的计算出来。
如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个 Segment 内的访问会扩散到不同的 Segment 中,CPU cache 命中率会下降,从而引起程序性能下降。(文档的说法是根据你并发的线程数量决定,太多会导性能降低)
1.3 创建分段锁
和 JDK6 不同,JDK7 中除了第一个 Segment 之外,剩余的 Segments 采用的是延迟初始化的机制:每次 put 之前都需要检查 key 对应的 Segment 是否为 null,如果是则调用 ensureSegment()以确保对应的 Segment 被创建。
ensureSegment 可能在并发环境下被调用,但与想象中不同,ensureSegment 并未使用锁来控制竞争,而是使用了 Unsafe 对象的 getObjectVolatile()提供的原子读语义结合 CAS 来确保 Segment 创建的原子性。代码段如下:
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
1.4 put/putIfAbsent/putAll
和 JDK6 一样,ConcurrentHashMap 的 put 方法被代理到了对应的 Segment(定位 Segment 的原理之前已经描述过)中。与 JDK6 不同的是,JDK7 版本的 ConcurrentHashMap 在获得 Segment 锁的过程中,做了一定的优化 - 在真正申请锁之前,put 方法会通过 tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应 hashcode 的链表进行遍历,如果遍历完毕仍然找不到与 key 相同的 HashEntry 节点,则为后续的 put 操作提前创建一个 HashEntry。当 tryLock 一定次数后仍无法获得锁,则通过 lock 申请锁。
需要注意的是,由于在并发环境下,其他线程的 put,rehash 或者 remove 操作可能会导致链表头结点的变化,因此在过程中需要进行检查,如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除,即使该变化因为是非原子写操作(删除节点后链接后续节点调用的是 Unsafe.putOrderedObject(),该方法不提供原子写语义)可能导致当前线程无法观察到,但因为不影响遍历的正确性所以忽略不计。
之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被 CPU cache 所缓存,为后续实际 put 过程中的链表遍历操作提升性能。
在获得锁之后,Segment 对链表进行遍历,如果某个 HashEntry 节点具有相同的 key,则更新该 HashEntry 的 value 值,否则新建一个 HashEntry 节点,将它设置为链表的新 head 节点并将原头节点设为新 head 的下一个节点。新建过程中如果节点总数(含新建的 HashEntry)超过 threshold,则调用 rehash()方法对 Segment 进行扩容,最后将新建 HashEntry 写入到数组中。
put 方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过 Unsafe 的 putOrderedObject()方法来实现,这里并未使用具有原子写语义的 putObjectVolatile()的原因是:JMM 会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。
1.5 rehash
相对于 HashMap 的 resize,ConcurrentHashMap 的 rehash 原理类似,但是 Doug Lea 为 rehash 做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于 2 的幂指来操作,假设扩容前某 HashEntry 对应到 Segment 中数组的 index 为 i,数组的容量为 capacity,那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i+capacity,因此大多数 HashEntry 节点在扩容前后 index 可以保持不变。基于此,rehash 方法中会定位第一个后续所有节点在扩容后 index 都保持不变的节点,然后将这个节点之前的所有节点重排即可。这部分代码如下:
private void rehash(HashEntry node) {
HashEntry[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry[] newTable =
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry e = oldTable[i];
if (e != null) {
HashEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
1.6 remove
和 put 类似,remove 在真正获得锁之前,也会对链表进行遍历以提高缓存命中率。
1.7 get 与 containsKey
get 与 containsKey 两个方法几乎完全一致:他们都没有使用锁,而是通过 Unsafe 对象的 getObjectVolatile()方法提供的原子读语义,来获得 Segment 以及对应的链表,然后对链表遍历判断是否存在 key 相同的节点以及获得该节点的 value。但由于遍历过程中其他线程可能对链表结构做了调整,因此 get 和 containsKey 返回的可能是过时的数据,这一点是 ConcurrentHashMap 在弱一致性上的体现。如果要求强一致性,那么必须使用 Collections.synchronizedMap()方法。
1.8 size、containsValue
这些方法都是基于整个 ConcurrentHashMap 来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的 Segment(通过 Unsafe 的 getObjectVolatile()以保证原子读语义),获得对应的值以及所有 Segment 的 modcount 之和。如果连续两次所有 Segment 的 modcount 和相等,则过程中没有发生其他线程修改 ConcurrentHashMap 的情况,返回获得的值。
当循环次数超过预定义的值时,这时需要对所有的 Segment 依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的 Segment,否则容易出现其他线程创建 Segment 并进行 put,remove 等操作。代码如下:
for(int j =0; j < segments.length; ++j)
ensureSegment(j).lock();// force creation
一般来说,应该避免在多线程环境下使用 size 和 containsValue 方法。
注 1:modcount 在 put, replace, remove 以及 clear 等方法中都会被修改。
注 2:对于 containsValue 方法来说,如果在循环过程中发现匹配 value 的 HashEntry,则直接返回 true。
最后,与 HashMap 不同的是,ConcurrentHashMap 并不允许 key 或者 value 为 null,按照 Doug Lea 的说法,这么设计的原因是在 ConcurrentHashMap 中,一旦 value 出现 null,则代表 HashEntry 的 key/value 没有映射完成就被其他线程所见,需要特殊处理。在 JDK6 中,get 方法的实现中就有一段对 HashEntry.value == null 的防御性判断。但 Doug Lea 也承认实际运行过程中,这种情况似乎不可能发生(参考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。
2. JDK8 中的实现
ConcurrentHashMap 在 JDK8 中进行了巨大改动,很需要通过源码来再次学习下 Doug Lea 的实现方法。
它摒弃了 Segment(锁段)的概念,而是启用了一种全新的方式实现,利用 CAS 算法。它沿用了与它同时期的 HashMap 版本的思想,底层依然由“数组”+ 链表 + 红黑树的方式思想(JDK7 与 JDK8 中 HashMap 的实现),但是为了做到并发,又增加了很多辅助的类,例如 TreeBin,Traverser 等对象内部类。
2.1 重要的属性
首先来看几个重要的属性,与 HashMap 相同的就不再介绍了,这里重点解释一下 sizeCtl 这个属性。可以说它是 ConcurrentHashMap 中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。
-
负数代表正在进行初始化或扩容操作
-
-1 代表正在初始化
-
-N 表示有 N-1 个线程正在进行扩容操作
-
正数或 0 代表 hash 表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前 ConcurrentHashMap 容量的 0.75 倍,这与 loadfactor 是对应的。
/**
* 盛装Node元素的数组 它的大小是2的整数次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node[] table;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
hash表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
// 以下两个是用来控制扩容的时候 单线程进入的变量
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点
static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点
2.2 重要的类
2.2.1 Node
Node 是最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。它与 HashMap 中的定义很相似,但是但是有一些差别它对 value 和 next 属性设置了 volatile 同步锁(与 JDK7 的 Segment 相同),它不允许调用 setValue 方法直接改变 Node 的 value 域,它增加了 find 方法辅助 map.get()方法。
2.2.2 TreeNode
树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为 TreeNode。但是与 HashMap 不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。而且 TreeNode 在 ConcurrentHashMap 集成自 Node 类,而并非 HashMap 中的集成自 LinkedHashMap.Entry 类,也就是说 TreeNode 带有 next 指针,这样做的目的是方便基于 TreeBin 的访问。
2.2.3 TreeBin
这个类并不负责包装用户的 key、value 信息,而是包装的很多 TreeNode 节点。它代替了 TreeNode 的根节点,也就是说在实际的 ConcurrentHashMap“数组”中,存放的是 TreeBin 对象,而不是 TreeNode 对象,这是与 HashMap 的区别。另外这个类还带有了读写锁。
这里仅贴出它的构造方法。可以看到在构造 TreeBin 节点时,仅仅指定了它的 hash 值为 TREEBIN 常量,这也就是个标识为。同时也看到我们熟悉的红黑树构造方法
2.2.4 ForwardingNode
一个用于连接两个 table 的节点类。它包含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key value next 指针全部为 null,它的 hash 值为-1. 这里面定义的 find 的方法是从 nextTable 里进行查询节点,而不是以自身为头节点进行查找。
/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode extends Node {
final Node[] nextTable;
ForwardingNode(Node[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node[] tab = nextTable;;) {
Node e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
2.3 Unsafe 与 CAS
在 ConcurrentHashMap 中,随处可以看到 U, 大量使用了 U.compareAndSwapXXX 的方法,这个方法是利用一个 CAS 算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN 的思想是比较类似的。
2.3.1 unsafe 静态块
unsafe 代码块控制了一些属性的修改工作,比如最常用的 SIZECTL 。在这一版本的 concurrentHashMap 中,大量应用来的 CAS 方法进行变量、属性的修改工作。利用 CAS 进行无锁操作,可以大大提高性能。
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
2.3.2 三个核心方法
ConcurrentHashMap 定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了 ConcurrentHashMap 的线程安全。
//获得在i位置上的Node节点
static final Node tabAt(Node[] tab, int i) {
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN
static final boolean casTabAt(Node[] tab, int i,
Node c, Node v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法设置节点位置的值
static final void setTabAt(Node[] tab, int i, Node v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
2.4 初始化方法 initTable
对于 ConcurrentHashMap 来说,调用它的构造方法仅仅是设置了一些参数而已。而整个 table 的初始化是在向 ConcurrentHashMap 中插入元素的时候发生的。如调用 put、computeIfAbsent、compute、merge 等方法的时候,调用时机是检查 table==null。
初始化方法主要应用了关键属性 sizeCtl 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出 ConcurrentHashMap 的初始化只能由一个线程完成。如果获得了初始化权限,就用 CAS 方法将 sizeCtl 置为-1,防止其他线程进入。初始化数组后,将 sizeCtl 的值改为 0.75*n。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node[n];
table = tab = nt;
sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
2.5 扩容方法 transfer
当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足 concurrent 的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。
整个扩容操作分为两个部分
-
第一部分是构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过 RESIZE_STAMP_SHIFT 这个常量经过一次运算来保证的,这个地方在后面会有提到;
-
第二个部分就是将原来 table 中的元素复制到 nextTable 中,这里允许多线程进行操作。
先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数 i,然后利用 tabAt 方法获得 i 位置的元素:
-
如果这个位置为空,就在原 table 中的 i 位置放入 forwardNode 节点,这个也是触发并发扩容的关键点;
-
如果这个位置是 Node 节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上
-
如果这个位置是 TreeBin 节点(fh<0),也做一个反序处理,并且判断是否需要 untreefi,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上
-
遍历过所有的节点以后就完成了复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。
再看一下多线程是如何完成的:
在代码的 69 行有一个判断,如果遍历到的节点是 forward 节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值 set 为 forward,另一个线程看到 forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。
/**
* 一个过渡的table表 只有在扩容的时候才会使用
*/
private transient volatile Node[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode fwd = new ForwardingNode(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node f; int fh;
//这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node ln, hn;
//如果fh>=0 证明这是一个Node节点
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
Node lastRun = f;
for (Node p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//对TreeBin对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造正序和反序两个链表
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
}
}
}
}
}
2.6 Put 方法
前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap 最常用的就是 put 和 get 两个方法。现在来介绍 put 方法,这个 put 方法依然沿用 HashMap 的 put 方法的思想,根据 hash 值计算这个新插入的点在 table 中的位置 i,如果 i 位置是空的,直接放进去,否则进行判断,如果 i 位置是树节点,按照树的方式插入新的节点,否则把 i 插入到链表的末尾。ConcurrentHashMap 中依然沿用这个思想,有一个最重要的不同点就是 ConcurrentHashMap 不允许 key 或 value 为 null 值。另外由于涉及到多线程,put 方法就要复杂一点。在多线程中可能有以下两个情况
-
如果一个或多个线程正在对 ConcurrentHashMap 进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为 transfer 方法中在空结点上插入 forward 节点,如果检测到需要插入的位置被 forward 节点占有,就帮助进行扩容;
-
如果检测到要插入的节点是非空且不是 forward 节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比 hashTable 的 synchronized 要好得多。
整体流程就是首先定义不允许 key 或 value 为 null 的情况放入 对于每一个放入的值,首先利用 spread 方法对 key 的 hashcode 进行一次 hash 计算,由此来确定这个值在 table 中的位置。
如果这个位置是空的,那么直接放入,而且不需要加锁操作。
如果这个位置存在结点,说明发生了 hash 碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是 hash 值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到 hash 值与 key 值都与新加入节点是一致的情况,则只需要更新 value 值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于 8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许 key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//死循环 何时插入成功 何时跳出
for (Node[] tab = table;;) {
Node f; int n, i, fh;
//如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据hash值计算出在table里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh〉0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
//在这里遍历链表所有的结点
for (Node e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 则修改对应结点的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}
我们可以发现 JDK8 中的实现也是锁分离的思想,只是锁住的是一个 Node,而不是 JDK7 中的 Segment,而锁住 Node 之前的操作是无锁的并且也是线程安全的,建立在之前提到的 3 个原子操作上。
2.6.1 helpTransfer 方法
这是一个协助扩容的方法。这个方法被调用的时候,当前 ConcurrentHashMap 一定已经有了 nextTable 对象,首先拿到这个 nextTable 对象,调用 transfer 方法。回看上面的 transfer 方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。
2.6.2 treeifyBin 方法
这个方法用于将过长的链表转换为 TreeBin 对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为 TreeBin ,这与 HashMap 不同的是,它并没有把 TreeNode 直接放入红黑树,而是利用了 TreeBin 这个小容器来封装所有的 TreeNode.
2.7 get 方法
get 方法比较简单,给定一个 key 来确定 value 的时候,必须满足两个条件 key 相同 hash 值相同,对于节点可能在链表或树上的情况,需要分别去查找。
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0 说明这个节点在树上 直接寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
2.8 Size 相关的方法
对于 ConcurrentHashMap 来说,这个 table 里到底装了多少东西其实是个不确定的数量,因为不可能在调用 size()方法的时候像 GC 的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap 也是大费周章才计算出来的。
2.8.1 辅助定义
为了统计元素个数,ConcurrentHashMap 定义了一些变量和一个内部类
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64\. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/******************************************/
/**
* 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
但它并不用返回当前hashmap的元素个数
*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
2.8.2 mappingCount 与 Size 方法
mappingCount 与 size 方法的类似 从 Java 工程师给出的注释来看,应该使用 mappingCount 代替 size 方法 两个方法都没有直接返回 basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}
2.8.3 addCount 方法
在 put 方法结尾处调用了 addCount 方法,把当前 ConcurrentHashMap 的元素个数 +1 这个方法一共做了两件事,更新 baseCount 的值,检测是否进行扩容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
总结
JDK6,7 中的 ConcurrentHashmap 主要使用 Segment 来实现减小锁粒度,把 HashMap 分割成若干个 Segment,在 put 的时候需要锁住 Segment,get 时候不加锁,使用 volatile 来保证可见性,当要统计全局时(比如 size),首先会尝试多次计算 modcount 来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回 size。如果有,则需要依次锁住所有的 Segment 来计算。
jdk7 中 ConcurrentHashmap 中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以 jdk8 中完全重写了 concurrentHashmap,代码量从原来的 1000 多行变成了 6000 多 行,实现上也和原来的分段式存储有很大的区别。
主要设计上的变化有以下几点:
- 不采用 segment 而采用 node,锁住 node 来实现减小锁粒度。
- 设计了 MOVED 状态 当 resize 的中过程中 线程 2 还在 put 数据,线程 2 会帮助 resize。
- 使用 3 个 CAS 操作来确保 node 的一些操作的原子性,这种方式代替了锁。
- sizeCtl 的不同值来代表不同含义,起到了控制的作用。
至于为什么 JDK8 中使用 synchronized 而不是 ReentrantLock,我猜是因为 JDK8 中对 synchronized 有了足够的优化吧。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于