基于 jdk1.8
Java 并发包中提供的一个线程安全且高效的 HashMap 实现,可以完全替代 HashTable,在并发编程的场景中使用频率非常之高。
可能大多人只是知道它使用了多个锁代替 HashTable 中的单个锁,也就是锁分离技术(Lock Stripping)
实现原理
1.8 之前 ConcurrentHashMap 是使用 Segment 段来进行,一个段就相当于一个 HashMap 的数据结构,每个段使用一个锁
1.8 之后 Segment 虽保留,但已经简化属性,仅仅是为了兼容旧版本,使用和 HashMap 一样的数据结构每个数组位置使用一个锁
再学习中始终要考虑多线程的情况
一.类加载
主要知道以下的一些初始化值
private static final int MAXIMUM_CAPACITY = 1 << 30; //数组最大大小 同 HashMap
private static final int DEFAULT_CAPACITY = 16;//数组默认大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //兼容旧版保留的值,默认线程并发度,类似信号量
private static final float LOAD_FACTOR = 0.75f;//默认 map 扩容比例,实际用(n << 1) - (n >>> 1)代替了更高效
static final int TREEIFY_THRESHOLD = 8; // 链表转树阀值,大于 8 时
static final int UNTREEIFY_THRESHOLD = 6; //树转链表阀值,小于等于 6(tranfer 时,lc、hc=0 两个计数器分别 ++ 记录原 bin、新 binTreeNode 数量,<=UNTREEIFY_THRESHOLD 则 untreeify(lo))。【仅在扩容 tranfer 时才可能树转链表】
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;//扩容转移时的最小数组分组大小
private static int RESIZE_STAMP_BITS = 16;//本类中没提供修改的方法 用来根据 n 生成位置一个类似时间搓的功能
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 2^15-1,help resize 的最大线程数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 32-16=16,sizeCtl 中记录 size 大小的偏移量
static final int MOVED = -1; // hash for forwarding nodes(forwarding nodes 的 hash 值)、标示位
static final int TREEBIN = -2; // hash for roots of trees(树根节点的 hash 值)
static final int RESERVED = -3; // 保留
static final int HASH_BITS = 0x7fffffff; // 用在计算 hash 时进行安位与计算消除负 hash
static final int NCPU = Runtime.getRuntime().availableProcessors(); // 可用处理器数量
二、put 数据
这里先看使用空构造方法产生实例的使用,这个没问题了,在学习其它有参的构造方法也没什么大问题了
完全理解了 put 基本 ConcurrentHashMap 就理解一大半了主要思想也理解
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { 1 if (key == null || value == null) throw new NullPointerException(); 2 int hash = spread(key.hashCode()); 3 int binCount = 0; 4 for (Node<K,V>[] tab = table;;) { 5 Node<K,V> f; int n, i, fh; 6 if (tab == null || (n = tab.length) == 0) 7 tab = initTable(); 8 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 9 if (casTabAt(tab, i, null, 10 new Node<K,V>(hash, key, value, null))) 11 break; // no lock when adding to empty bin 12 } 13 else if ((fh = f.hash) == MOVED) 14 tab = helpTransfer(tab, f); 15 else { 16 V oldVal = null; 17 synchronized (f) { 18 if (tabAt(tab, i) == f) { 19 if (fh >= 0) { 20 binCount = 1; 21 for (Node<K,V> e = f;; ++binCount) { 22 K ek; 23 if (e.hash == hash && 24 ((ek = e.key) == key || 25 (ek != null && key.equals(ek)))) { 26 oldVal = e.val; 27 if (!onlyIfAbsent) 28 e.val = value; 29 break; 30 } 31 Node<K,V> pred = e; 32 if ((e = e.next) == null) { 33 pred.next = new Node<K,V>(hash, key, 34 value, null); 35 break; 36 } 37 } 38 } 39 else if (f instanceof TreeBin) { 40 Node<K,V> p; 41 binCount = 2; 42 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 43 value)) != null) { 44 oldVal = p.val; 45 if (!onlyIfAbsent) 46 p.val = value; 47 } 48 } 49 } 50 } 51 if (binCount != 0) { 52 if (binCount >= TREEIFY_THRESHOLD) 53 treeifyBin(tab, i); 54 if (oldVal != null) 55 return oldVal; 56 break; 57 } 58 } 59 } 60 addCount(1L, binCount); 61 return null; } 一句一句学习 1. if (key == null || value == null) throw new NullPointerException();//key value不能null 2. int hash = spread(key.hashCode()); static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 与hashmap计算hash基本一样,但多了一步& HASH_BITS,HASH_BITS是0x7fffffff,该步是为了消除最高位上的负符号 hash的负在ConcurrentHashMap中有特殊意义表示在扩容或者是树节点 4. for (Node<K,V>[] tab = table;;) { 死循环 table即map的基本Node数组 Node: final int hash; final K key; volatile V val; volatile Node<K,V> next; 基本结构和hashmap的node没区别,区别在val和next是volatile的即保证线程可见性,为后面的多线程服务 7. tab = initTable();//table为null或空map时进行初始化 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) {//死循环以完成初始化 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin -1代表正在初始化或者扩容,则本线程退让,依赖上面的死循环继续初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS判断SIZECTL和sc相同SIZECTL赋值-1表示正在初始化,只有一个线程进行初始化其它线程在上个if卡住 try { if ((tab = table) == null || tab.length == 0) { //第一个线程初始化之后,第二个线程还会进来所以需要再次判断一次 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt;//创建基本Node数组结构 sc = n - (n >>> 2);//扩容阀值 实际就是0.75*n 写法略叼更高端比直接乘高效 } } finally { sizeCtl = sc; } break; } } return tab; } 8. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 判断当前索引位置的数组上是否有值 无值那就是首元素插入 i = (n - 1) & hash计算索引位置 tabAt(tab, i = (n - 1) & hash) //获取数组i索引的Node CAS操作 保持其它线程对table的改变在这里可见 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } 9. if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) 根据8该索引为null则插入值,casTabAt主要用于CAS比较该索引处是否为null防止其它线程已改变该值,null则插入,break结束死循环 13. else if ((fh = f.hash) == MOVED) 首地址处不null 并且Node的hash是-1 表示是ForwardingNode节点正在rehash扩容 14. tab = helpTransfer(tab, f); 帮助扩容 这个和扩容单学习不在这分析了 17. synchronized (f) { //到这也就说明了有hash冲突也不需要帮助扩容 最直接的操作synchronized锁f,即本索引处的首节点,可能是链表或红黑树 有了synchronized那么后续的冲突插入就简单了也不需要考虑多线程问题了 18. if (tabAt(tab, i) == f) {//这里volatile获取首节点与8处获取的首节点对比判断f还是不是首节点 注意因为是多线程的关系虽然remove等操作也会锁首节点但在从第8处执行到锁这里的时候这里的首节点是完全存在被其它线程干掉或者空旋的情况的 19. if (fh >= 0) { //fh即节点的的二次hash值,判断是为了等待扩容完成 39. else if (f instanceof TreeBin) { //fh<0 -2表示红黑树节点 是树直接添加树节点 之后的的数据插入过程有了锁就和hashmap一样了,最后判断是否超过8链表变红黑树 60. addCount(1L, binCount); //这就是最后的插入数据后根据一定条件来进行扩容的方法了 binCount 链表时 记录链表长度
三、addCount 计数并判断是否扩容
private final void addCount(long x, int check) {
1 CounterCell[] as; long b, s;
2 if ((as = counterCells) != null ||
3 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
4 CounterCell a; long v; int m;
5 boolean uncontended = true;
6 if (as == null || (m = as.length - 1) < 0 ||
7 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
8 !(uncontended =
9 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
10 fullAddCount(x, uncontended);
11 return;
12 }
13 if (check <= 1)
14 return;
15 s = sumCount();
16 }
17 if (check >= 0) {
18 Node<K,V>[] tab, nt; int n, sc;
19 while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
20 (n = tab.length) < MAXIMUM_CAPACITY) {
21 int rs = resizeStamp(n);
22 if (sc < 0) {
23 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
24 sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
25 transferIndex <= 0)
26 break;
27 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
28 transfer(tab, nt);
29 }
30 else if (U.compareAndSwapInt(this, SIZECTL, sc,
31 (rs << RESIZE_STAMP_SHIFT) + 2))
32 transfer(tab, null);
33 s = sumCount();
}
}
}
1 CounterCell类只有一个volatile的long类型变量
2 if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
private transient volatile long baseCount; 表示map中的元素个数但不一定对,需要加counterCells数组里存的数量
private transient volatile CounterCell[] counterCells; 用于辅组baseCount存储元素个数
这个if主要干的事
2.1 counterCells初始是null,则运行后面的CAS对baseCount增加,但存在多线程可能会导致CAS增加失败,则运行fullAddCount把数量值存到counterCells数组中
2.2 counterCells不null之前已经有过baseCount CAS失败,这种能失败大多代表并发不低,则在counterCells数组中使用随机数随便取一个索引位置之前记录的数据进行数量累加,
如果在counterCells数组中CAS累加因多线程还是失败这继续fullAddCount
fullAddCount中会触发扩容等操作,因此直接return
13 if (check <= 1)//删除或清理节点时是-1 插入索引首节点0 第二个节点是1
到这个if说明之前竞争大,现在竞争小counterCells更新成功了,那么在上述时不进行扩容的检查
因此ConcurrentHashMap是有可能自己超过0.75的容量阀值而不扩容的
sumCount()正真的计算map元素数量的方法,baseCount和counterCells数组存的总和
17 if (check >= 0) { //删除时不检查
19 while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {
检查s即元素总数是否超过阀值 死循环防止多线程同时扩容在CAS操作sizeCtl时即else if中竞争失败而跳过扩容检查
21 int rs = resizeStamp(n);
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
RESIZE_STAMP_BITS是静态非常量16 按注释是可以改变该值的,但在本类中是没有对该值进行变更的
这个方法可以看出是生成与n有关的标记,且n不变的情况下生成的一定是一样的
注意这里的或后面的计算,最终结果第15位肯定是1
22-30
else if
sc=sizeCtl初始是阀值,sc肯定大于0
rs << (RESIZE_STAMP_SHIFT) + 2 //RESIZE_STAMP_SHIFT=16
由于rs第15位是1因此左移16,那么第32位肯定是1 即结果一定是一个比较大的负数
+2 也是有用的 左移之后 低16位肯定是0 +2只影响低16位的值
这里就把SIZECTL变成了一个比较大的负数,也就去前面提到的是负数的时候代表在扩容
SIZECTL和注释有点冲突,注释不是很准确,注释的-(1+正在扩容线程数)是不对的
if(sc < 0) //在有线程正在扩容时sc就是负数了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)
比如就两个线程同时运行这个方法,第二个线程竞争修改SIZECTL失败,那么第二个线程因外层的死循环会运行到这 注意线程1时 SIZECTL=rs << (RESIZE_STAMP_SHIFT) + 2
线程2 条件成立直接break这就意味着不检查了
(sc >>> RESIZE_STAMP_SHIFT) != rs || (nt = nextTable) == null ||transferIndex <= 0
这3个条件成立一个都代表扩容已经完成不需要再扩容,这里是防止太频繁扩容消耗性能,并发非常高的时候存在超过阀值而没扩容的可能
sc == rs + 1 ||sc == rs + MAX_RESIZERS
这两条件是与RESIZE_STAMP_BITS这个静态变量有关,类中没提供修改的地方,但是注释说可以是6-32,那么比如极端的等于32,那sc等于不移动然后+2,与rs+1相等的判断是判断扩容线程是否减少了一个,即没有扩容的线程再运行说明扩容完成了
sc == rs + MAX_RESIZERS需要猜测变量等于多少的时候有用的判断
现阶段一般也不会修改这个变量 因此不太需要管这两个判断条件
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
在同一个n下其它线程还没扩容完,这里帮助扩容 同时SIZECTL+1 表示增加一个扩容线程
四、扩容
包含了二个主要方法:transfer 扩容、helpTransfer 帮助扩容
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//根据cpu个数找出扩容时的数组跨度大小即最小分组 16 32 64增长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//普通扩容nextTab为空,竞争帮助扩容时有值,n<<1说明扩容2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//当前转移的位置,说明是逆序迁移
transferIndex = n;
}
int nextn = nextTab.length;
//创建扩容的连接节点,节点hash是-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {//死循环检查
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)//当前分组未转移完||扩容全部完成 --i完成数组逆序迁移
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//TRANSFERINDEX为0表示无下一个分组了
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {//CAS TRANSFERINDEX 多线程时,advance死循环会找到不同的分组,以一个分组一个线程负责来进行扩容
bound = nextBound;//迁移时本分组的下界
i = nextIndex - 1;//上界
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {//全部迁移完或无分组
int sc;
if (finishing) {//扩容完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//减少一个扩容线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)//根据前面addCount的+2这里就有-2 判断是否是最后一个正在扩容的线程
return;
finishing = advance = true;//准备结束
i = n; // recheck before commit 赋值n让其进入本if进行是否结束的检查
}
}
else if ((f = tabAt(tab, i)) == null)//原数组i位置无节点
advance = casTabAt(tab, i, null, fwd);//cas插入扩容节点 多线程插入失败就循环重新检查
else if ((fh = f.hash) == MOVED)//实际是检查上一步为null时CAS是否成功
advance = true; // already processed 之后在上面的while中变更i后继续
else {
synchronized (f) {//首节点上锁
if (tabAt(tab, i) == f) {//节点此时没本remove等干掉
Node<K,V> ln, hn;
if (fh >= 0) {//不是树节点
//下面这段是在拆分本位置的链表 一拆为二(一链表正向一链表反向,0或非0谁在最后连续那它就是正向,另一个反向) map大小n是2的倍数 与计算只会有0和n本身 好想法
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);//拆后的链表1放在新数组i位置
setTabAt(nextTab, i + n, hn);//链表2放i+n位置
setTabAt(tab, i, fwd);//原数组i位置放扩容节点
advance = true;//i位置索引迁移完成
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//扩容后数量太少降为链表 不用树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
helpTransfer帮助扩容就不详细说了 看懂transfer这个就基本不会有问题的
五、get 获取
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {//确定hash位置的索引处有值 if ((eh = e.hash) == h) {//hash相等 if ((ek = e.key) == key || (ek != null && key.equals(ek)))//首节点 return e.val; } else if (eh < 0)//非正常节点 真在扩容或者是树节点 return (p = e.find(h, key)) != null ? p.val : null;//使用ForwardingNode或TreeNode的find方法查找元素 while ((e = e.next) != null) {//遍历链表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
六、remove 删除
实际是 replaceNode 方法
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {//死循环删除 防止扩容等的影响
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)//table不存在或者 索引位置无值
break;
else if ((fh = f.hash) == MOVED)//正在扩容
tab = helpTransfer(tab, f);//帮助扩容
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {//锁首节点
if (tabAt(tab, i) == f) {
if (fh >= 0) {//链表
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {//找到需要替换的节点
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {//传递替换的新值是null 或者新值和原值相等
oldVal = ev;
if (value != null)//有原值 替换值
e.val = value;
else if (pred != null)//说明不是链表首节点 删除 即改变前一节点的next
pred.next = e.next;
else//链表首节点 删除
setTabAt(tab, i, e.next);
}
break;
}
//两句循环链表
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {//首节点是红黑树
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {//查找节点
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;//替换
else if (t.removeTreeNode(p))//删除
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {//有删除节点
if (oldVal != null) {//有原值
if (value == null)//无新值 表示是删除
addCount(-1L, -1);//减少元素计数-1
return oldVal;
}
break;
}
}
}
return null;
}
public void clear() {
long delta = 0L; // negative number of deletions
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
if (f == null)//索引无值
++i;
else if ((fh = f.hash) == MOVED) {//正在扩容 帮助扩容后重置索引删除
tab = helpTransfer(tab, f);
i = 0; // restart
}
else {
synchronized (f) {//上锁
if (tabAt(tab, i) == f) {
Node<K,V> p = (fh >= 0 ? f :
(f instanceof TreeBin) ?
((TreeBin<K,V>)f).first : null);
while (p != null) {//计数删除元素个数
--delta;
p = p.next;
}
setTabAt(tab, i++, null);//直接在table中抛弃整个链表或树
}
}
}
}
if (delta != 0L)
addCount(delta, -1);//减少计数
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于