ConcurrentHashmap源码好好给你说明白
这个ConcurrentHashmap的设计非常精妙,如果有疑问的地方,欢迎大家在评论区进行激烈讨论!
一、静态工具方法
1 private static final int tableSizeFor(int c) { 2 int n = c - 1; 3 n |= n >>> 1; 4 n |= n >>> 2; 5 n |= n >>> 4; 6 n |= n >>> 8; 7 n |= n >>> 16; 8 return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; 9 }
此方法是对给定的int型数据c,返回一个值(比如叫x),则x满足x >=c且x是2的整数次幂。
首先为什么先将c-1,我们等下再说,先解释下从代码第3行到第7行的意思,第三行的意思是先将n与n无符号右移1位后的值做“或”运算,然后将值再赋给n,之后的以此类推。
为什么最后只到了右移16位呢?因为int数据在内存中只有32位。经过这一系列操作,就保证了n的二进制表示中将第一个出现1的位置的后面全部设置为1。然后再返回n+1就保证了
大于c并且是2的整数次幂。然后再解释下为什么开头先将c减去1,因为如果c本来就是2的m次幂的话,我们使用同样的方法最后会得到2的m+1次幂的结果。
二、初始化table:
1 private final Node[] initTable() { 2 Node [] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 if ((sc = sizeCtl) < 0) 5 Thread.yield(); // lost initialization race; just spin 6 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 7 try { 8 if ((tab = table) == null || tab.length == 0) { 9 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 10 @SuppressWarnings("unchecked") 11 Node [] nt = (Node [])new Node<?,?>[n]; 12 table = tab = nt; 13 sc = n - (n >>> 2); 14 } 15 } finally { 16 sizeCtl = sc; 17 } 18 break; 19 } 20 } 21 return tab; 22 }
当多个线程同事执行第6行时,只会有一个返回true。compareAndSwapInt方法会将堆上的字段sizeCtl改为-1.这样其他线程会继续在while循环中一直处于第4行的判断内。直到线程将使用sizeCtl将table初始化完。在初始化table后,sizeCtl会修改为下次需要扩容的阈值,即table容量乘以负载因子(n*0.75),这里使用位移的方法(如第13行)。从这里可以看出,其实这个负载因子是固定不变的。构造函数中的loadFactor,仅仅影响table初始化的容量:
1 public ConcurrentHashMap(int initialCapacity, 2 float loadFactor, int concurrencyLevel) { 3 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 4 throw new IllegalArgumentException(); 5 if (initialCapacity < concurrencyLevel) // Use at least as many bins 6 initialCapacity = concurrencyLevel; // as estimated threads 7 long size = (long)(1.0 + (long)initialCapacity / loadFactor); 8 int cap = (size >= (long)MAXIMUM_CAPACITY) ? 9 MAXIMUM_CAPACITY : tableSizeFor((int)size); 10 this.sizeCtl = cap; 11 }
三、新增数据(put方法)
1 final V putVal(K key, V value, boolean onlyIfAbsent) { 2 if (key == null || value == null) throw new NullPointerException(); 3 int hash = spread(key.hashCode()); 4 int binCount = 0; 5 for (Node[] tab = table;;) { 6 Node f; int n, i, fh; 7 if (tab == null || (n = tab.length) == 0) 8 tab = initTable(); 9 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 10 if (casTabAt(tab, i, null, 11 new Node (hash, key, value, null))) 12 break; // no lock when adding to empty bin 13 } 14 else if ((fh = f.hash) == MOVED) 15 tab = helpTransfer(tab, f); 16 else { 17 V oldVal = null; 18 synchronized (f) { 19 if (tabAt(tab, i) == f) { 20 if (fh >= 0) { 21 binCount = 1; 22 for (Node e = f;; ++binCount) { 23 K ek; 24 if (e.hash == hash && 25 ((ek = e.key) == key || 26 (ek != null && key.equals(ek)))) { 27 oldVal = e.val; 28 if (!onlyIfAbsent) 29 e.val = value; 30 break; 31 } 32 Node pred = e; 33 if ((e = e.next) == null) { 34 pred.next = new Node (hash, key, 35 value, null); 36 break; 37 } 38 } 39 } 40 else if (f instanceof TreeBin) { 41 Node p; 42 binCount = 2; 43 if ((p = ((TreeBin )f).putTreeVal(hash, key, 44 value)) != null) { 45 oldVal = p.val; 46 if (!onlyIfAbsent) 47 p.val = value; 48 } 49 } 50 } 51 } 52 if (binCount != 0) { 53 if (binCount >= TREEIFY_THRESHOLD) 54 treeifyBin(tab, i); 55 if (oldVal != null) 56 return oldVal; 57 break; 58 } 59 } 60 } 61 addCount(1L, binCount); 62 return null; 63 }
方法的主要逻辑是:在key和数组长度计算出的值来确定在数组中插入的位置,如果此位置原来没有节点则构造一个节点插入该位置,如果以前有节点,则在节点所在的链表添加新的节点,如果此位置的节点是一颗树,则在树上添加新的节点。如果插入后的节点数量大于TREEIFY_THRESHOLD,则将该节点转化为树形结构。(该树为一颗红黑树)。其中第15行方法helpTransfer,第54行treeifyBin和第61行addCount方法,将会在后面进行说明。值得注意的是:在方法treefyBin中,会判断如果table的长度小于MIN_TREEIFY_CAPACITY的话,则不会将节点构造成树,而是将table扩容。
四、树化
1 private final void treeifyBin(Node[] tab, int index) { 2 Node b; int n, sc; 3 if (tab != null) { 4 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) 5 tryPresize(n << 1); 6 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { 7 synchronized (b) { 8 if (tabAt(tab, index) == b) { 9 TreeNode hd = null, tl = null; 10 for (Node e = b; e != null; e = e.next) { 11 TreeNode p = 12 new TreeNode (e.hash, e.key, e.val, 13 null, null); 14 if ((p.prev = tl) == null) 15 hd = p; 16 else 17 tl.next = p; 18 tl = p; 19 } 20 setTabAt(tab, index, new TreeBin (hd)); 21 } 22 } 23 } 24 } 25 }
真正的树化动作是在第20行的new TreeBin
五、扩容方法(transfer)
1 private final void transfer(Node[] tab, Node [] nextTab) { 2 int n = tab.length, stride; 3 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 4 stride = MIN_TRANSFER_STRIDE; // subdivide range 5 if (nextTab == null) { // initiating 6 try { 7 @SuppressWarnings("unchecked") 8 Node [] nt = (Node [])new Node<?,?>[n << 1]; 9 nextTab = nt; 10 } catch (Throwable ex) { // try to cope with OOME 11 sizeCtl = Integer.MAX_VALUE; 12 return; 13 } 14 nextTable = nextTab; 15 transferIndex = n; 16 } 17 int nextn = nextTab.length; 18 ForwardingNode fwd = new ForwardingNode (nextTab); 19 boolean advance = true; 20 boolean finishing = false; // to ensure sweep before committing nextTab 21 for (int i = 0, bound = 0;;) { 22 Node f; int fh; 23 while (advance) { 24 int nextIndex, nextBound; 25 if (--i >= bound || finishing) 26 advance = false; 27 else if ((nextIndex = transferIndex) <= 0) { 28 i = -1; 29 advance = false; 30 } 31 else if (U.compareAndSwapInt 32 (this, TRANSFERINDEX, nextIndex, 33 nextBound = (nextIndex > stride ? 34 nextIndex - stride : 0))) { 35 bound = nextBound; 36 i = nextIndex - 1; 37 advance = false; 38 } 39 } 40 if (i < 0 || i >= n || i + n >= nextn) { 41 int sc; 42 if (finishing) { 43 nextTable = null; 44 table = nextTab; 45 sizeCtl = (n << 1) - (n >>> 1); 46 return; 47 } 48 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 49 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 50 return; 51 finishing = advance = true; 52 i = n; // recheck before commit 53 } 54 } 55 else if ((f = tabAt(tab, i)) == null) 56 advance = casTabAt(tab, i, null, fwd); 57 else if ((fh = f.hash) == MOVED) 58 advance = true; // already processed 59 else { 60 synchronized (f) { 61 if (tabAt(tab, i) == f) { 62 Node ln, hn; 63 if (fh >= 0) { 64 int runBit = fh & n; 65 Node lastRun = f; 66 for (Node p = f.next; p != null; p = p.next) { 67 int b = p.hash & n; 68 if (b != runBit) { 69 runBit = b; 70 lastRun = p; 71 } 72 } 73 if (runBit == 0) { 74 ln = lastRun; 75 hn = null; 76 } 77 else { 78 hn = lastRun; 79 ln = null; 80 } 81 for (Node p = f; p != lastRun; p = p.next) { 82 int ph = p.hash; K pk = p.key; V pv = p.val; 83 if ((ph & n) == 0) 84 ln = new Node (ph, pk, pv, ln); 85 else 86 hn = new Node (ph, pk, pv, hn); 87 } 88 setTabAt(nextTab, i, ln); 89 setTabAt(nextTab, i + n, hn); 90 setTabAt(tab, i, fwd); 91 advance = true; 92 } 93 else if (f instanceof TreeBin) { 94 TreeBin t = (TreeBin )f; 95 TreeNode lo = null, loTail = null; 96 TreeNode hi = null, hiTail = null; 97 int lc = 0, hc = 0; 98 for (Node e = t.first; e != null; e = e.next) { 99 int h = e.hash; 100 TreeNode p = new TreeNode 101 (h, e.key, e.val, null, null); 102 if ((h & n) == 0) { 103 if ((p.prev = loTail) == null) 104 lo = p; 105 else 106 loTail.next = p; 107 loTail = p; 108 ++lc; 109 } 110 else { 111 if ((p.prev = hiTail) == null) 112 hi = p; 113 else 114 hiTail.next = p; 115 hiTail = p; 116 ++hc; 117 } 118 } 119 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 120 (hc != 0) ? new TreeBin (lo) : t; 121 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 122 (lc != 0) ? new TreeBin (hi) : t; 123 setTabAt(nextTab, i, ln); 124 setTabAt(nextTab, i + n, hn); 125 setTabAt(tab, i, fwd); 126 advance = true; 127 } 128 } 129 } 130 } 131 } 132 }
总体逻辑:
1.如果nextTable为null则初始化一个。(容量为以前的2倍)
2.然后将table中的节点按MIN_TRANSFER_STRIDE分为多个区间
3.对每个区间的节点数据进行转移,转以后在table中将节点(即使该节点为null)置为ForwardingNode,标记为已转移。
4.最后table = nextTable。
需要注意的细节:
问:为什么要分为多个区间?
答:可以允许多个线程同时进行扩容不同的区间而不受其他线程影响。关键代码位置:第27行到第37行。
第63行条件判断开始处理链表节点的转移:
runBit记录节点的hash属性与原数组长度n的“&”运算结果,lastRun记录的是链表中最后面位置中节点的runBit值相同的子链表的头结点。
因为n是原数组的长度,而数组长度必须是2的整数次幂(n == 2的x次方),所以n的二进制表示中,只有第x+1位是1,其他全为0。所以如果hash&n == 0,则表示hash中第x+1位为0,否则为1.
我们现在假设要转移的节点(node)所在数组中的位置为i,则i == node.hash&(n-1),如果node.hash&n == 0的话,那么node.hash&(2n-1) == node.hash&(n-1) == i。即扩容后node的位置不变。
如果node.hash&n > 0的话,则node.hash&(2n-1) == i + n。即扩容后node的位置向后移动n位。
上面的第64行到第87行是将当前节点分为2个子链表,分别是ln为hash&n == 0的子链表 和hn为hash&n > 0的子链表。根据我们之前讨论的结果ln应该还在当前的位置i,而hn则应该向后移动n位,即在i+n位置。
然后将旧数组的当前节点i位置用一个标志节点来标记此位置已经转移了。
转移树形结构的节点与链表类似,因为TreeNode是Node的子类,其本身也是一个链表,可以通过next属性遍历整个树。
再看下sizeCtl这个实例变量:
1 private transient volatile int sizeCtl;
当它为负值时,表示正在初始化或者扩容(-1表示初始化);
否则,当table为null时,它为默认初始值0,或者保存着table的初始化值;
当table初始化结束时,它保存着下次需要扩容的阈值。
六、计数方法
1 private final void addCount(long x, int check) { 2 CounterCell[] as; long b, s; 3 if ((as = counterCells) != null || 4 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 5 CounterCell a; long v; int m; 6 boolean uncontended = true; 7 if (as == null || (m = as.length - 1) < 0 || 8 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 9 !(uncontended = 10 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 11 fullAddCount(x, uncontended); 12 return; 13 } 14 if (check <= 1) 15 return; 16 s = sumCount(); 17 } 18 if (check >= 0) { 19 Node[] tab, nt; int n, sc; 20 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 21 (n = tab.length) < MAXIMUM_CAPACITY) { 22 int rs = resizeStamp(n); 23 if (sc < 0) { 24 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 25 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 26 transferIndex <= 0) 27 break; 28 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 29 transfer(tab, nt); 30 } 31 else if (U.compareAndSwapInt(this, SIZECTL, sc, 32 (rs << RESIZE_STAMP_SHIFT) + 2)) 33 transfer(tab, null); 34 s = sumCount(); 35 } 36 } 37
该方法涉及的两个实例变量baseCount和counterCells是用来保存当前table中有多少数据的。
第17行之前是计算数据个数的,之后是检查是否需要扩容或者如果正在扩容则参与到扩容中。然后重新检查看看是否需要继续扩容。
其中的第24行判断,是错误的,在好几个方法中用到此种判断的都是同样的错误,请注意(这是jdk的一个BUG,在现在的已发布的JDK8-JDK11版本中都未解决,估计以后在JDK12发布版本中会fix掉,现在的openJDK12中已经fix了,参考https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427),其中的sc==rs + 1 || sc == rs + MAX_RESIZES应该改为sc == (rs << RESIZE_STAMP_SHIFT)+ 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZES,
用来判断扩容是否结束和扩容线程是否达到最大值。可以看下第31行,表示初始化扩容,所以开始扩容时, sizeCtl == (rs << RESIZE_STAMP_SHIFT) + 2,因为每个扩容线程在进行帮助扩容时都会使用CAS将sizeCtl+1,然后进入transfer,接着在transfer方法中,每当一个扩容线程结束时都会将sizeCtl - 1,所以当所有扩容线程结束时sizeCtl == (rs << RESIZE_STAMP_SHIFT)+ 1,因为第一个进入扩容的线程没有将sizeCtl+1.
七、最后看下size计算table中数据的个数的方法
1 public int size() { 2 long n = sumCount(); 3 return ((n < 0L) ? 0 : 4 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 5 (int)n); 6 } 7 8 9 10 final long sumCount() { 11 CounterCell[] as = counterCells; CounterCell a; 12 long sum = baseCount; 13 if (as != null) { 14 for (int i = 0; i < as.length; ++i) { 15 if ((a = as[i]) != null) 16 sum += a.value; 17 } 18 } 19 return sum; 20 }
因为size中调用sumCount方法来间接计算个数,所以直接看sumCount方法。
它是将baseCount和counterCells中每个节点的value属性值累加在一起得到最后的数量。