ConcurrentHashMap
ConcurrentHashMap
HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个 基本原理之上进行了各种优化
首先是所有数据都放在一个大的HashMap中;其次是引入了红黑树,原理如下:
- 如果头节点是Node类型,则尾随它的就是一个普通的链表;如果头节点是TreeNode类型,它的后 面就是一颗红黑树,TreeNode是Node的子类
- 链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转 换成红黑树;反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。
- 这种设计优化:
- 使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突的问 题由此得到较好的解决。
- 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是 Node数组的长度,初始长度为16
- 并发扩容,这是难度最大的。当一个线程要扩容Node数组的时候,其他线程还要读写,因此 处理过程很复杂,后面会详细分析
- 所以这种设计一方面降低了Hash冲突,另一方面也提升了并发度
源码解析
1、构造方法
public ConcurrentHashMap(int initialCapacity,//初始容量
float loadFactor, //加载因子
int concurrencyLevel) {//级别
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
- 参数:初始容量,加载因子,并发级别
- 如果初始化大小小于并发级别,则让他们相等
- 判断size大小
- cap代表数字长度的值
- 变量cap就是Node数组的长度,保持为2的整数次方。tableSizeFor(...)方法是根 据传入的初始容量,计算出一个合适的数组长度。具体而言:1.5倍的初始容量+1,再往上取最接近的2 的整数次方,作为数组长度cap的初始值
- 这里的 sizeCtl,其含义是用于控制在初始化或者并发扩容时候的线程数,只不过其初始值设置成 cap。
2、初始化
? 构造方法里只计算了数组的初始大小,并没有对数组进行初始化。当多个线程都往里面放 入元素的时候,再进行初始化。这就存在一个问题:多个线程重复初始化
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 重点:将sizeCtl设置为-1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node<?,?>[n];// 初始化
table = tab = nt;
// sizeCtl不是数组长度,因此初始化成功后,就不再等于数组长度,而是n-(n>>>2)=0.75n,表示下一次扩容的阈值:n-n/4
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;// 设置sizeCtl的值为sc。
}
break;
}
}
return tab;
}
- 首先需要初始化一个table,当sizeCtl等于0时,自旋等待
- 若条件发生变化,则将sizeCtl设置为-1(并发,那个线程获取-1,就可以进行下面的操作)
- 初始化创建数组,长度为n
- sizeCtl不是数组长度,因此初始化成功后,就不再等于数组长度,而是n-(n>>>2)=0.75n,表示下一次扩容的阈值:n-n/4
- 设置sizeCtl的值为sc,返回Node数组
通过上面的代码可以看到,多个线程的竞争是通过对sizeCtl进行CAS操作实现的。如果某个线程成 功地把 sizeCtl 设置为-1,它就拥有了初始化的权利,进入初始化的代码模块,等到初始化完成,再把 sizeCtl设置回去;其他线程则一直执行while循环,自旋等待,直到数组不为null,即当初始化结束时, 退出整个方法
3、put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
// 分支1:整个数组初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 分支2:第i个元素初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 分支3:扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 分支4:放入元素
else {
V oldVal = null;
// 加锁
synchronized (f) {
// 链表
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果是链表,上面的binCount会一直累加
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);// 超出阈值,转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);// 总元素个数累加1
return null;
}
- onlyIfAbsent:表示不存在的时候才向数组中放
- 若等于分支1.则调用初始化数组方法initTable()
- 若为分支2,第i个元素初始化(表示这个槽点没有数据,所以直接将该元素放到头节点返回)
- 若为分支3,说明这个槽点正在扩容,helpTransfer方法帮助扩容
- 最后到分支4,就是把元素放入槽内。槽内可能是一个链表,也可能是一棵红黑树,通过头节点的类型 可以判断是哪一种。第4个分支是包裹在synchronized (f)里面的,f对应的数组下标位置的头节点, 意味着每个数组元素有一把锁,并发度等于数组的长度
- 上面的binCount表示链表的元素个数,当这个数目超过TREEIFY_THRESHOLD=8时,把链表转换成 红黑树,也就是 treeifyBin(tab,i)方法。但在这个方法内部,不一定需要进行红黑树转换,可能只做 扩容操作。
4、扩容
扩容是ConcurrentHashMap中实现最复杂的一环
从上面的put方法源码中,可知当binCount大于等于阈值8,变成红黑树,所以扩容从treeifyBin方法读起
private final void treeifyBin(Node[] tab, int index) {
Node b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//数组长度小于阈值64,不做红黑树转换,直接扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 链表转换为红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode hd = null, tl = null;
// 遍历链表,初始化红黑树
for (Node e = b; e != null; e = e.next) {
TreeNode p =
new TreeNode(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin(hd));
}
}
}
}
}
- 判断tab不等于null,说明槽点数据已经有了
- 数组长度小于阈值64,不做红黑树转换,直接扩容
- MIN_TREEIFY_CAPACITY=64(数组的长度):存储箱可树化的最小表容量,这个值至少是4被阈值(TREEIFY_THRESHOLD=8)大小,以避免调整大小和树化阈值之间的冲突
- 若数组长度大于阈值64时,则转换为红黑树
在treeifyBin方法中,内部调用了方法tryPresize方法
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 计算步长
if (nextTab == null) { // 初始化新的HashMap
try {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node<?,?>[n << 1]; //扩容两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//初始的transferIndex为旧HashMap的数组长度
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode fwd = new ForwardingNode(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 此处,i为遍历下标,bound为边界
// 如果成功获取一个任务,则i=nextIndex-1
//bound=nextIndex-stride;
//如果获取不到,则i=0,bound=0
for (int i = 0, bound = 0;;) {
Node f; int fh;
// advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
while (advance) {
int nextIndex, nextBound;
// 以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一直while循环
// 目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务。
if (--i >= bound || finishing)
// 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步。
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 对transferIndex执行CAS操作,即为当前线程分配1个stride。
// CAS操作成功,线程成功获取到一个stride的迁移任务;
// CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i越界,整个HashMap遍历完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing表示整个HashMap扩容完成
if (finishing) {
nextTable = null;
// 将nextTab赋值给当前table
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// tab[i]迁移完毕,赋值一个ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// tab[i]的位置已经在迁移过程中
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对tab[i]进行迁移操作,tab[i]可能是一个链表或者红黑树
synchronized (f) {
if (tabAt(tab, i) == f) {
Node ln, hn;
// 链表
if (fh >= 0) {
int runBit = fh & n;
Node lastRun = f;
for (Node p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
// 表示lastRun之后的所有元素,hash值都是一样的
// 记录下这个最后的位置
lastRun = p;
}
}
if (runBit == 0) {// 链表迁移的优化做法
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {// 红黑树,迁移做法和链表类似
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
-
可以看到,扩容首先是先尝试扩容,调用第一个方法tryPresize,也就是尝试预先调整表的大小以容纳给定数量的元素
-
具体扩容的方法在末尾调用的第二个方法transfer
-
扩容的基本原理:
-
首先建立一个新的HashMap,其数组长度是原来数组的俩倍
-
将old的元素逐个迁移过来,这点从参数中可以看出,tab是扩容前的HashMap,nextTab是扩容后的hashMap,当nextTab==null时,就会对nextTab进行初始化,容量为2倍
-
上述步骤就会遇到一个问题,并发问题,如下图:
-
旧的的数组长度为N,每个线程扩容一段,一段的长度用量用变量stride(步长)来表示,transferIndex表示整个数组扩容的进度
-
stride(步长)的计算公式:在单核模式下直接等于0,因为单核模式下没有办 法多个线程并行扩容,只需要1个线程来扩容整个数组;在多核模式下为 (n>>> 3)/NCPU,并且保证步长的最小值是 16。显然,需要的线程个数约为n/stride
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
-
transferIndex是ConcurrentHashMap的一个成员变量,记录了扩容的进度。初始值为n,从大到 小扩容,每次减stride个位置,最终减至n<=0,表示整个扩容完成。因此,从[0,transferIndex-1]的 位置表示还没有分配到线程扩容的部分,从[transfexIndex,n-1]的位置表示已经分配给某个线程进行扩 容,当前正在扩容中,或者已经扩容成功。
-
因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进行操作,如下面的代码 所示
else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0)))
-
在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的 HashMap 里面。这个时候,所有调用 get(k,v)的线程还是会访问旧 HashMap,怎么处理 呢?
-
解决方案:当Node[0]已经迁移成功,而其他Node还在迁移过程中时, 如果有线程要读取Node[0]的数据,就会访问失败。为此,新建一个ForwardingNode,即转 发节点,在这个节点里面记录的是新的 ConcurrentHashMap 的引用。这样,当线程访问到 ForwardingNode之后,会去查询新的ConcurrentHashMap
-
因为数组的长度 tab.length 是2的整数次方,每次扩容又是2倍。而 Hash 函数是 hashCode%tab.length,等价于hashCode&(tab.length-1)。这意味着:处于第i个位置的 元素,在新的Hash表的数组中一定处于第i个或者第i+n个位置,举个简单的例 子:假设数组长度是8,扩容之后是16
-
若hashCode=5,5&8=0,扩容后,5&16=0,位置保持不变;
-
若hashCode=24,24&8=0,扩容后,24&16=8,后移8个位置;
-
若hashCode=25,25&8=1,扩容后,25&16=9,后移8个位置;
-
若hashCode=39,39&8=7,扩容后,39&8=7,位置保持不变;
-
正因为有这样的规律,所以如下有代码:
setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd);
-
也就是把tab[i]位置的链表或红黑树重新组装成两部分,一部分链接到nextTab[i]的位置,一部分链 接到nextTab[i+n]的位置。然后把tab[i]的位置指向一个ForwardingNode节点
-
同时,当tab[i]后面是链表时,使用类似于JDK 7中在扩容时的优化方法,从lastRun往后的所有节 点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝
-
了解了核心的迁移函数transfer(tab,nextTab),再回头看tryPresize(int size)函数。这个函 数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个 函数,需要透彻地理解sizeCtl变量,下面这段注释摘自源码
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */
- 当sizeCtl=-1时,表示整个HashMap正在初始化;
- 当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容;
- 当sizeCtl=cap时,tab=null,表示未初始之前的初始容量(如上面的构造函数所示);
- 扩容成功之后,sizeCtl存储的是下一次要扩容的阈值,即上面初始化代码中的n-(n>>>2) =0.75n。
- 所以,sizeCtl变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的 tryPresize(int size)函数。
-
tryPresize(int size)是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。 在第一次扩容的时候,sizeCtl会被设置成一个很大的负数U.compareAndSwapInt(this,SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT)+2);之后每一个线程扩容的时候,sizeCtl 就加 1, U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待扩容完成之后,sizeCtl减1。
-
-
-
-