ConcurrentHashmap源码好好给你说明白


这个ConcurrentHashmap的设计非常精妙,如果有疑问的地方,欢迎大家在评论区进行激烈讨论!

一、静态工具方法

1 private static final int tableSizeFor(int c) {
2         int n = c - 1;
3         n |= n >>> 1;
4         n |= n >>> 2;
5         n |= n >>> 4;
6         n |= n >>> 8;
7         n |= n >>> 16;
8         return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
9     }

此方法是对给定的int型数据c,返回一个值(比如叫x),则x满足x >=c且x是2的整数次幂。

首先为什么先将c-1,我们等下再说,先解释下从代码第3行到第7行的意思,第三行的意思是先将n与n无符号右移1位后的值做“或”运算,然后将值再赋给n,之后的以此类推。

为什么最后只到了右移16位呢?因为int数据在内存中只有32位。经过这一系列操作,就保证了n的二进制表示中将第一个出现1的位置的后面全部设置为1。然后再返回n+1就保证了

大于c并且是2的整数次幂。然后再解释下为什么开头先将c减去1,因为如果c本来就是2的m次幂的话,我们使用同样的方法最后会得到2的m+1次幂的结果。

二、初始化table:

 1 private final Node[] initTable() {
 2         Node[] tab; int sc;
 3         while ((tab = table) == null || tab.length == 0) {
 4             if ((sc = sizeCtl) < 0)
 5                 Thread.yield(); // lost initialization race; just spin
 6             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
 7                 try {
 8                     if ((tab = table) == null || tab.length == 0) {
 9                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
10                         @SuppressWarnings("unchecked")
11                         Node[] nt = (Node[])new Node<?,?>[n];
12                         table = tab = nt;
13                         sc = n - (n >>> 2);
14                     }
15                 } finally {
16                     sizeCtl = sc;
17                 }
18                 break;
19             }
20         }
21         return tab;
22     }

 当多个线程同事执行第6行时,只会有一个返回true。compareAndSwapInt方法会将堆上的字段sizeCtl改为-1.这样其他线程会继续在while循环中一直处于第4行的判断内。直到线程将使用sizeCtl将table初始化完。在初始化table后,sizeCtl会修改为下次需要扩容的阈值,即table容量乘以负载因子(n*0.75),这里使用位移的方法(如第13行)。从这里可以看出,其实这个负载因子是固定不变的。构造函数中的loadFactor,仅仅影响table初始化的容量:

 1 public ConcurrentHashMap(int initialCapacity,
 2                              float loadFactor, int concurrencyLevel) {
 3         if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
 4             throw new IllegalArgumentException();
 5         if (initialCapacity < concurrencyLevel)   // Use at least as many bins
 6             initialCapacity = concurrencyLevel;   // as estimated threads
 7         long size = (long)(1.0 + (long)initialCapacity / loadFactor);
 8         int cap = (size >= (long)MAXIMUM_CAPACITY) ?
 9             MAXIMUM_CAPACITY : tableSizeFor((int)size);
10         this.sizeCtl = cap;
11     }

三、新增数据(put方法)

 1 final V putVal(K key, V value, boolean onlyIfAbsent) {
 2         if (key == null || value == null) throw new NullPointerException();
 3         int hash = spread(key.hashCode());
 4         int binCount = 0;
 5         for (Node[] tab = table;;) {
 6             Node f; int n, i, fh;
 7             if (tab == null || (n = tab.length) == 0)
 8                 tab = initTable();
 9             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
10                 if (casTabAt(tab, i, null,
11                              new Node(hash, key, value, null)))
12                     break;                   // no lock when adding to empty bin
13             }
14             else if ((fh = f.hash) == MOVED)
15                 tab = helpTransfer(tab, f);
16             else {
17                 V oldVal = null;
18                 synchronized (f) {
19                     if (tabAt(tab, i) == f) {
20                         if (fh >= 0) {
21                             binCount = 1;
22                             for (Node e = f;; ++binCount) {
23                                 K ek;
24                                 if (e.hash == hash &&
25                                     ((ek = e.key) == key ||
26                                      (ek != null && key.equals(ek)))) {
27                                     oldVal = e.val;
28                                     if (!onlyIfAbsent)
29                                         e.val = value;
30                                     break;
31                                 }
32                                 Node pred = e;
33                                 if ((e = e.next) == null) {
34                                     pred.next = new Node(hash, key,
35                                                               value, null);
36                                     break;
37                                 }
38                             }
39                         }
40                         else if (f instanceof TreeBin) {
41                             Node p;
42                             binCount = 2;
43                             if ((p = ((TreeBin)f).putTreeVal(hash, key,
44                                                            value)) != null) {
45                                 oldVal = p.val;
46                                 if (!onlyIfAbsent)
47                                     p.val = value;
48                             }
49                         }
50                     }
51                 }
52                 if (binCount != 0) {
53                     if (binCount >= TREEIFY_THRESHOLD)
54                         treeifyBin(tab, i);
55                     if (oldVal != null)
56                         return oldVal;
57                     break;
58                 }
59             }
60         }
61         addCount(1L, binCount);
62         return null;
63     }

方法的主要逻辑是:在key和数组长度计算出的值来确定在数组中插入的位置,如果此位置原来没有节点则构造一个节点插入该位置,如果以前有节点,则在节点所在的链表添加新的节点,如果此位置的节点是一颗树,则在树上添加新的节点。如果插入后的节点数量大于TREEIFY_THRESHOLD,则将该节点转化为树形结构。(该树为一颗红黑树)。其中第15行方法helpTransfer,第54行treeifyBin和第61行addCount方法,将会在后面进行说明。值得注意的是:在方法treefyBin中,会判断如果table的长度小于MIN_TREEIFY_CAPACITY的话,则不会将节点构造成树,而是将table扩容。

四、树化

 1 private final void treeifyBin(Node[] tab, int index) {
 2         Node b; int n, sc;
 3         if (tab != null) {
 4             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
 5                 tryPresize(n << 1);
 6             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
 7                 synchronized (b) {
 8                     if (tabAt(tab, index) == b) {
 9                         TreeNode hd = null, tl = null;
10                         for (Node e = b; e != null; e = e.next) {
11                             TreeNode p =
12                                 new TreeNode(e.hash, e.key, e.val,
13                                                   null, null);
14                             if ((p.prev = tl) == null)
15                                 hd = p;
16                             else
17                                 tl.next = p;
18                             tl = p;
19                         }
20                         setTabAt(tab, index, new TreeBin(hd));
21                     }
22                 }
23             }
24         }
25     }

真正的树化动作是在第20行的new TreeBin(hd)此构造方法中进行的。其中TreeBin类是一个红黑树结构。

五、扩容方法(transfer)

  1 private final void transfer(Node[] tab, Node[] nextTab) {
  2         int n = tab.length, stride;
  3         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  4             stride = MIN_TRANSFER_STRIDE; // subdivide range
  5         if (nextTab == null) {            // initiating
  6             try {
  7                 @SuppressWarnings("unchecked")
  8                 Node[] nt = (Node[])new Node<?,?>[n << 1];
  9                 nextTab = nt;
 10             } catch (Throwable ex) {      // try to cope with OOME
 11                 sizeCtl = Integer.MAX_VALUE;
 12                 return;
 13             }
 14             nextTable = nextTab;
 15             transferIndex = n;
 16         }
 17         int nextn = nextTab.length;
 18         ForwardingNode fwd = new ForwardingNode(nextTab);
 19         boolean advance = true;
 20         boolean finishing = false; // to ensure sweep before committing nextTab
 21         for (int i = 0, bound = 0;;) {
 22             Node f; int fh;
 23             while (advance) {
 24                 int nextIndex, nextBound;
 25                 if (--i >= bound || finishing)
 26                     advance = false;
 27                 else if ((nextIndex = transferIndex) <= 0) {
 28                     i = -1;
 29                     advance = false;
 30                 }
 31                 else if (U.compareAndSwapInt
 32                          (this, TRANSFERINDEX, nextIndex,
 33                           nextBound = (nextIndex > stride ?
 34                                        nextIndex - stride : 0))) {
 35                     bound = nextBound;
 36                     i = nextIndex - 1;
 37                     advance = false;
 38                 }
 39             }
 40             if (i < 0 || i >= n || i + n >= nextn) {
 41                 int sc;
 42                 if (finishing) {
 43                     nextTable = null;
 44                     table = nextTab;
 45                     sizeCtl = (n << 1) - (n >>> 1);
 46                     return;
 47                 }
 48                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 49                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 50                         return;
 51                     finishing = advance = true;
 52                     i = n; // recheck before commit
 53                 }
 54             }
 55             else if ((f = tabAt(tab, i)) == null)
 56                 advance = casTabAt(tab, i, null, fwd);
 57             else if ((fh = f.hash) == MOVED)
 58                 advance = true; // already processed
 59             else {
 60                 synchronized (f) {
 61                     if (tabAt(tab, i) == f) {
 62                         Node ln, hn;
 63                         if (fh >= 0) {
 64                             int runBit = fh & n;
 65                             Node lastRun = f;
 66                             for (Node p = f.next; p != null; p = p.next) {
 67                                 int b = p.hash & n;
 68                                 if (b != runBit) {
 69                                     runBit = b;
 70                                     lastRun = p;
 71                                 }
 72                             }
 73                             if (runBit == 0) {
 74                                 ln = lastRun;
 75                                 hn = null;
 76                             }
 77                             else {
 78                                 hn = lastRun;
 79                                 ln = null;
 80                             }
 81                             for (Node p = f; p != lastRun; p = p.next) {
 82                                 int ph = p.hash; K pk = p.key; V pv = p.val;
 83                                 if ((ph & n) == 0)
 84                                     ln = new Node(ph, pk, pv, ln);
 85                                 else
 86                                     hn = new Node(ph, pk, pv, hn);
 87                             }
 88                             setTabAt(nextTab, i, ln);
 89                             setTabAt(nextTab, i + n, hn);
 90                             setTabAt(tab, i, fwd);
 91                             advance = true;
 92                         }
 93                         else if (f instanceof TreeBin) {
 94                             TreeBin t = (TreeBin)f;
 95                             TreeNode lo = null, loTail = null;
 96                             TreeNode hi = null, hiTail = null;
 97                             int lc = 0, hc = 0;
 98                             for (Node e = t.first; e != null; e = e.next) {
 99                                 int h = e.hash;
100                                 TreeNode p = new TreeNode
101                                     (h, e.key, e.val, null, null);
102                                 if ((h & n) == 0) {
103                                     if ((p.prev = loTail) == null)
104                                         lo = p;
105                                     else
106                                         loTail.next = p;
107                                     loTail = p;
108                                     ++lc;
109                                 }
110                                 else {
111                                     if ((p.prev = hiTail) == null)
112                                         hi = p;
113                                     else
114                                         hiTail.next = p;
115                                     hiTail = p;
116                                     ++hc;
117                                 }
118                             }
119                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
120                                 (hc != 0) ? new TreeBin(lo) : t;
121                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
122                                 (lc != 0) ? new TreeBin(hi) : t;
123                             setTabAt(nextTab, i, ln);
124                             setTabAt(nextTab, i + n, hn);
125                             setTabAt(tab, i, fwd);
126                             advance = true;
127                         }
128                     }
129                 }
130             }
131         }
132     }

总体逻辑:

1.如果nextTable为null则初始化一个。(容量为以前的2倍)

2.然后将table中的节点按MIN_TRANSFER_STRIDE分为多个区间

3.对每个区间的节点数据进行转移,转以后在table中将节点(即使该节点为null)置为ForwardingNode,标记为已转移。

4.最后table = nextTable。

需要注意的细节:

问:为什么要分为多个区间?

答:可以允许多个线程同时进行扩容不同的区间而不受其他线程影响。关键代码位置:第27行到第37行。

 第63行条件判断开始处理链表节点的转移:

runBit记录节点的hash属性与原数组长度n的“&”运算结果,lastRun记录的是链表中最后面位置中节点的runBit值相同的子链表的头结点。

因为n是原数组的长度,而数组长度必须是2的整数次幂(n == 2的x次方),所以n的二进制表示中,只有第x+1位是1,其他全为0。所以如果hash&n == 0,则表示hash中第x+1位为0,否则为1.

我们现在假设要转移的节点(node)所在数组中的位置为i,则i == node.hash&(n-1),如果node.hash&n == 0的话,那么node.hash&(2n-1) == node.hash&(n-1) == i。即扩容后node的位置不变。

如果node.hash&n > 0的话,则node.hash&(2n-1) == i + n。即扩容后node的位置向后移动n位。

上面的第64行到第87行是将当前节点分为2个子链表,分别是ln为hash&n == 0的子链表 和hn为hash&n > 0的子链表。根据我们之前讨论的结果ln应该还在当前的位置i,而hn则应该向后移动n位,即在i+n位置。

然后将旧数组的当前节点i位置用一个标志节点来标记此位置已经转移了。

转移树形结构的节点与链表类似,因为TreeNode是Node的子类,其本身也是一个链表,可以通过next属性遍历整个树。

再看下sizeCtl这个实例变量:

 1 private transient volatile int sizeCtl; 

当它为负值时,表示正在初始化或者扩容(-1表示初始化);

否则,当table为null时,它为默认初始值0,或者保存着table的初始化值;

当table初始化结束时,它保存着下次需要扩容的阈值。

 六、计数方法

 1 private final void addCount(long x, int check) {
 2         CounterCell[] as; long b, s;
 3         if ((as = counterCells) != null ||
 4             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 5             CounterCell a; long v; int m;
 6             boolean uncontended = true;
 7             if (as == null || (m = as.length - 1) < 0 ||
 8                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
 9                 !(uncontended =
10                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
11                 fullAddCount(x, uncontended);
12                 return;
13             }
14             if (check <= 1)
15                 return;
16             s = sumCount();
17         }
18         if (check >= 0) {
19             Node[] tab, nt; int n, sc;
20             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
21                    (n = tab.length) < MAXIMUM_CAPACITY) {
22                 int rs = resizeStamp(n);
23                 if (sc < 0) {
24                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
25                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
26                         transferIndex <= 0)
27                         break;
28                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
29                         transfer(tab, nt);
30                 }
31                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
32                                              (rs << RESIZE_STAMP_SHIFT) + 2))
33                     transfer(tab, null);
34                 s = sumCount();
35             }
36         }
37     

该方法涉及的两个实例变量baseCount和counterCells是用来保存当前table中有多少数据的。

第17行之前是计算数据个数的,之后是检查是否需要扩容或者如果正在扩容则参与到扩容中。然后重新检查看看是否需要继续扩容。

其中的第24行判断,是错误的,在好几个方法中用到此种判断的都是同样的错误,请注意(这是jdk的一个BUG,在现在的已发布的JDK8-JDK11版本中都未解决,估计以后在JDK12发布版本中会fix掉,现在的openJDK12中已经fix了,参考https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427),其中的sc==rs + 1 || sc == rs + MAX_RESIZES应该改为sc == (rs << RESIZE_STAMP_SHIFT)+ 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZES,

用来判断扩容是否结束和扩容线程是否达到最大值。可以看下第31行,表示初始化扩容,所以开始扩容时, sizeCtl == (rs << RESIZE_STAMP_SHIFT) + 2,因为每个扩容线程在进行帮助扩容时都会使用CAS将sizeCtl+1,然后进入transfer,接着在transfer方法中,每当一个扩容线程结束时都会将sizeCtl - 1,所以当所有扩容线程结束时sizeCtl ==  (rs << RESIZE_STAMP_SHIFT)+ 1,因为第一个进入扩容的线程没有将sizeCtl+1.

七、最后看下size计算table中数据的个数的方法

 1 public int size() {
 2         long n = sumCount();
 3         return ((n < 0L) ? 0 :
 4                 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
 5                 (int)n);
 6     }
 7 
 8 
 9 
10 final long sumCount() {
11         CounterCell[] as = counterCells; CounterCell a;
12         long sum = baseCount;
13         if (as != null) {
14             for (int i = 0; i < as.length; ++i) {
15                 if ((a = as[i]) != null)
16                     sum += a.value;
17             }
18         }
19         return sum;
20     }

因为size中调用sumCount方法来间接计算个数,所以直接看sumCount方法。

它是将baseCount和counterCells中每个节点的value属性值累加在一起得到最后的数量。