ConcurrentHashMap 源码分析


每当你想要努力一把的时候,都是未来的你在求救!!!

1. 概述

1.1 HashMapHashTable

HashMap 线程不安全

因为多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap,如以下代码:

public class TestHashMap {
    public static void main(String[] args) throws InterruptedException {
        final HashMap map = new HashMap(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");
                        }
                    }, "thread" + i).start();
                }
            }
        }, "thread");
        t.start();
        t.join();
    }
}

HashTable 效率低下

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

1.2 锁分段技术

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

1.3 ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

2. 原理

建议看ConcurrentHashMap 原理和源码之前,先去一下HashMap 的原理和源码,这样会更轻松一点哦!

注意了,注意了,看了很久才发现,前面介绍ConcurrentHashMap的结构时存在一些问题:

  • JDK1.7中,ConcurrentHashMap的结构是由Segment数组结构和HashEntry数组结构组成的

  • JDK1.8中已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用SynchronizedCAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本

    JDK1.8ConcurrentHashMap 的结构和 HashMap 基本上一模一样,不知道的小伙伴去看看我之前写的HashMap 源码解析吧!

我这里只看JDK1.8哦,至于和JDK1.7的区别,了解一下就可以吧~

3. 源码分析

3.1 构造方法分析

3.1.1 构造方法源码

HashMap一样,构造方法只会初始化一些重要的变量,比如负载因子和容量。底层的数据结构(如桶数组)是延迟在put 操作时进行初始化的。

// 构造方法 1
public ConcurrentHashMap() {
}

// 构造方法 2
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

// 构造方法 3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

// 构造方法 4
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

// 构造方法 5
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

构造方法 1

是我们用的最多的,它里面什么都没做,初始化的一些变量是在 initTable 方法中做的,后面会详细说。

构造方法 2

指定了初始化容量,这里调用的 tableSizeForHashMap 中的一模一样,就不说了哈

构造方法 3

指定了初始化容量和负载因子,最终调用的是 构造方法 4,传入了默认的 concurrencyLevel

构造方法 4

看起来是指定了初始容量、负载因子和并发级别。

这个构造方法在JDK1.8 中好像是为了兼容老版本的,因为在initTable 方法中已经写死了负载因子就是0.75,在JDK1.8 中也没有concurrencyLevel的概念了。

构造方法 5

将另一个 Map 中的映射拷贝一份到自己的存储结构中来,这个方法不是很常用。

3.1.2 重要参数源码
// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联  
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树 
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))  
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;

// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// forwarding nodes的hash值 
static final int MOVED     = -1; // hash for forwarding nodes
// 树根节点的hash值
static final int TREEBIN   = -2; // hash for roots of trees
// ReservationNode的hash值
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

//存放node的数组
transient volatile Node[] table;

/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义  
  *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容  
  *当为0时:代表当时的table还没有被初始化  
  *当为正数时:表示初始化或者下一次进行扩容的大小  
*/
private transient volatile int sizeCtl;
3.1.3 数据结构

Node

Node 是 ConcurrentHashMap 存储结构的基本单元,用于存储数据

static class Node implements Map.Entry {
    final int hash;
    final K key;
    volatile V val;
    volatile Node next;
    // ...
}

TreeNode

TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树。

static final class TreeNode extends Node {
    TreeNode parent;  // red-black tree links
    TreeNode left;
    TreeNode right;
    TreeNode prev;    // needed to unlink next upon deletion
    boolean red;
 	// ...   
}

3.2 插入(put 方法)

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // ConcurrentHashMap的键和值都不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 获取 key 的Hash值,这里spread方法后位移,再进行与操作。可以去看一下HashMap中怎么做的,结合理解
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        // 第1个分支:ConcurrentHashMap 是在第一次put时进行初始化操作的
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 第2个分支:通过 tabAt 方法查询当前数组下标位置是否有值,如果没有值,就使用CAS将元素放进去 
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 第3个分支:如果在进行扩容,则先进行扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 第4个分支:就是把元素放入槽内
        else {
            V oldVal = null;
             //如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
            synchronized (f) {
                // 这里是看table数组下标为 i 的位置是否和 f 相同(好好看,上面已经给f赋值了)
                // 如果相同,说明这个下表位置已经有数据了,要么是链表要么是红黑树
                // 而且注意看tabAt方法,它是调用的sun.misc.Unsafe直接从内存中拿出来的数据,这样可以保证数据的内存可见性
                if (tabAt(tab, i) == f) {
                    // 上面给 fh 赋值的是 f 的hash值,这里不懂的话好好看一下上面fh、f、和当前节点的关系
                    // fh>=0,表示当前table下标为 i 的位置是个链表
                    if (fh >= 0) {
                        // binCount 用于记录链表长度,后面判断转化为红黑树时会用到
                        binCount = 1;
                        for (Node e = f;; ++binCount) {
                            K ek;
                            //这里涉及到相同的key进行put就会覆盖原先的value 
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node pred = e;
                            // 将新put的节点添加到链表尾部
                            if ((e = e.next) == null) {
                                pred.next = new Node(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果f是红黑树结构 ,就需要将节点 put 进红黑树了 
                    else if (f instanceof TreeBin) {
                        Node p;
                        binCount = 2;
                        if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // TREEIFY_THRESHOLD 的默认值是 8,此时将链表转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 总元素个数累加1,统计size,并且检查是否需要扩容 
    addCount(1L, binCount);
    return null;
}

上面 putVal 方法的 for 循环中,有四个分支:

  • 第1个分支,是整个数组的初始化
  • 第2个分支,是所在的槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回
  • 第3个分支,说明该槽正在进行扩容,帮助其扩容
  • 第4个分支,就是把元素放入槽内。槽内可能是一个链表,也可能是一棵红黑树,通过头节点的类型可以判断是哪一种。第4个分支是包裹在 synchronized(f) 里面的,f对应的数组下标位置的头节点,意味着每个数组元素有一把锁,并发度等于数组的长度

上面的binCount表示链表的元素个数,当这个数目超过TREEIFY_THRESHOLD=8时,把链表转换成红黑树,也就是 treeifyBin(tab,i)方法。但在这个方法内部,不一定需要进行红黑树转换,可能只做扩容操作,后面讲扩容方法时就知道了。

第1个分支,是整个数组的初始化

上面可以看到,ConcurrentHashMap 是在第一次put时进行初始化操作的,调用了 initTable() 方法,下面看一下这个方法:

private final Node[] initTable() {
    Node[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 就是当 table 还是空的时候,就开始进行初始化
        if ((sc = sizeCtl) < 0)
            // 这里如果忘记了 sizeCtl 的意义,就回到前面去看一下吧
            // 当sizeCtl为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容
            // 所以此时只让一个线程初始化,其他线程来了就自旋等待
            Thread.yield(); // lost initialization race; just spin
        // 这里就使用CAS将SIZECTL赋为 -1,代表正在初始化,这样其他线程才会自旋等待
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 上面判断时,已经将sizeCtl的值赋给 sc了
                    // 所以当sc为正数时:表示初始化或者下一次进行扩容的大小
                    // 这里计算了初始化ConcurrentHashMap 的数组大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 初始化
                    @SuppressWarnings("unchecked")
                    Node[] nt = (Node[])new Node<?,?>[n];
                    table = tab = nt;
                    // sizeCtl不是数组长度,因此初始化成功后,就不再等于数组长度
                    // 而是n-(n>>>2)=0.75n,表示下一次扩容的阈值:n-n/4
                    sc = n - (n >>> 2);
                }
            } finally {
                // 最后再将计算好的 sc 赋给 sizeCtl,以便于扩容时使用
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

第3个分支:如果在进行扩容,则先进行扩容操作

在第三个分支调用了 helpTransfer 方法,这个方法是帮助从旧的 table 的元素复制到新的 table 中 。

其实 helpTransfer 方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作

final Node[] helpTransfer(Node[] tab, Node f) {
    Node[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode)f).nextTable) != null) {
        // 新的table nextTba已经存在前提下才能帮助扩容  
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 调用扩容方法  
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

这里面调用了扩容方法 transfer,后面说扩容的时候再具体看吧。

put 方法的原理大致就是这样了,后面再看一下扩容的方法,这个就比较复杂了!

3.3 扩容

回到上面的 put 方法中,最后需要转化成红黑树时调用了 treeifyBin(tab, i) 方法,咱们就从这个方法开始看起:

private final void treeifyBin(Node[] tab, int index) {
    Node b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 数组长度小于阈值64,不做红黑树转换,直接扩容
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 链表转换为红黑树
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode hd = null, tl = null;
                    // 遍历链表,初始化红黑树
                    for (Node e = b; e != null; e = e.next) {
                        TreeNode p =
                            new TreeNode(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通过TreeBin对象对TreeNode转换成红黑树
                    setTabAt(tab, index, new TreeBin(hd));
                }
            }
        }
    }
}

在上面的代码中,MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转换成红黑树。只有当数组长度大于或等于64时,才考虑把链表转换成红黑树。

上面调用了 tryPresize(n << 1) 方法,可以看到,扩容后的容量直接是当前容量的 2 倍(为什么是2倍,这好像在前面将 HashMap 的时候讲过)。下面就看一下 tryPresize 方法:

private final void tryPresize(int size) {
    // 这里先计算扩容后的值,注意tableSizeFor和HashMap中的 tableSizeFor 一样
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    	tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // 还记得吧:当sizeCtl为正数时,表示初始化或者下一次进行扩容的大小  
    while ((sc = sizeCtl) >= 0) {
        Node[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            // 这个分支就是初始化ConcurrentHashMap,和前面的 initTable 方法差不多
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node[] nt = (Node[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            // 如果当前数组长度已经达到最大(MAXIMUM_CAPACITY),或者将要扩容的容量小于等于sizeCtl
            break;
        else if (tab == table) {
            // 扩容操作
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node[] nt; // 新的数组
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 扩容
                transfer(tab, null);
        }
    }
}

下面就看一下扩容的核心方法 transfer

private final void transfer(Node[] tab, Node[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        // 计算步长
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating  // 初始化新的HashMap
        try {
            @SuppressWarnings("unchecked")
            // 扩容两倍
            Node[] nt = (Node[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 初始的transferIndex为旧HashMap的数组长度
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode fwd = new ForwardingNode(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    /*
    此处,i为遍历下标,bound为边界。
    如果成功获取一个任务,则i=nextIndex-1, bound=nextIndex-stride;
    如果获取不到,则i=0,bound=0
    */
    for (int i = 0, bound = 0;;) {
        Node f; int fh;
        // advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
        while (advance) {
            int nextIndex, nextBound;
            /*
            以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一直while循环
            目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务。
            */
            if (--i >= bound || finishing)
                // 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步。
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                // transferIndex <= 0,整个HashMap完成
                i = -1;
                advance = false;
            }
            /*
            对transferIndex执行CAS操作,即为当前线程分配1个stride。
            CAS操作成功,线程成功获取到一个stride的迁移任务;
            CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
            */
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // i越界,整个HashMap遍历完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // finishing表示整个HashMap扩容完成
            if (finishing) {
                nextTable = null;
                // 将nextTab赋值给当前table
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // tab[i]迁移完毕,赋值一个ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // tab[i]的位置已经在迁移过程中
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 对tab[i]进行迁移操作,tab[i]可能是一个链表或者红黑树
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node ln, hn;
                    // 链表
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node lastRun = f;
                        for (Node p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                // 表示lastRun之后的所有元素,hash值都是一样的
                                // 记录下这个最后的位置
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            // 链表迁移的优化做法
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node(ph, pk, pv, ln);
                            else
                                hn = new Node(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树,迁移做法和链表类似
                    else if (f instanceof TreeBin) {
                        TreeBin t = (TreeBin)f;
                        TreeNode lo = null, loTail = null;
                        TreeNode hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode p = new TreeNode
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

该方法非常复杂,下面一步步分析:

  1. 扩容的基本原理如下图,首先建一个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。所以,上面的方法参数有2个,第1个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap。当nextTab=null的时候,方法最初会对nextTab进行初始化。

    这里有一个关键点要说明:该方法会被多个线程调用,所以每个线程只是扩容旧的HashMap部分,这就涉及如何划分任务的问题。

  2. 上图为多个线程并行扩容-任务划分示意图。旧数组的长度是N,每个线程扩容一段,一段的长度用变量stride(步长)来表示,transferIndex 表示了整个数组扩容的进度。

    stride 的计算公式如上面的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线程并行扩容,只需要1个线程来扩容整个数组;在多核模式下为 (n>>>3)/NCPU,并且保证步长的最小值是 16。显然,需要的线程个数约为 n/stride

    transferIndexConcurrentHashMap 的一个成员变量,记录了扩容的进度。初始值为 n,从大到小扩容,每次减stride个位置,最终减至n<=0,表示整个扩容完成。因此,从[0,transferIndex-1]的位置表示还没有分配到线程扩容的部分,从[transfexIndex,n-1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。

    因为 transferIndex 会被多个线程并发修改,每次减stride,所以需要通过CAS进行操作,如下面的代码所示。

  3. 在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的HashMap 里面。这个时候,所有调用 get(k,v)的线程还是会访问旧 HashMap,怎么处理呢?

    下图为扩容过程中的转发示意图:当 Node[0] 已经迁移成功,而其他 Node 还在迁移过程中时,如果有线程要读取 Node[0] 的数据,就会访问失败。为此,新建一个 ForwardingNode,即转发节点,在这个节点里面记录的是新的 ConcurrentHashMap 的引用。这样,当线程访问到 ForwardingNode 之后,会去查询新的ConcurrentHashMap

  4. 因为数组的长度 tab.length 是2的整数次方,每次扩容又是2倍。而 Hash 函数是hashCode%tab.length,等价于 hashCode&(tab.length-1)。这意味着:处于第i个位置的元素,在新的Hash表的数组中一定处于第i个或者第i+n个位置,如下图所示。举个简单的例子:假设数组长度是8,扩容之后是16:

    若hashCode=5,5%8=0,扩容后,5%16=0,位置保持不变;

    若hashCode=24,24%8=0,扩容后,24%16=8,后移8个位置;

    若hashCode=25,25%8=1,扩容后,25%16=9,后移8个位置;

    若hashCode=39,39%8=7,扩容后,39%8=7,位置保持不变;

    ......

    正因为有这样的规律,所以如下有代码:

    也就是把 tab[i] 位置的链表或红黑树重新组装成两部分,一部分链接到 nextTab[i] 的位置,一部分链接到 nextTab[i+n] 的位置,如上图所示。然后把 tab[i] 的位置指向一个 ForwardingNode 节点。

    同时,当 tab[i] 后面是链表时,使用类似于 JDK 7 中在扩容时的优化方法,从 lastRun 往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从 lastRun 往前的所有节点,需要依次拷贝。

    了解了核心的迁移函数transfer(tab,nextTab),再回头看 tryPresize(int size)函数。这个函数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个函数,需要透彻地理解 sizeCtl 变量,下面这段注释摘自源码。

    sizeCtl=-1时,表示整个 HashMap 正在初始化;

    sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容;

    sizeCtl=cap时,tab=null,表示未初始之前的初始容量(如上面的构造函数所示);

    扩容成功之后,sizeCtl 存储的是下一次要扩容的阈值,即上面初始化代码中的 n-(n>>>2)=0.75n

    所以,sizeCtl 变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的tryPresize(int size)函数。

    tryPresize(int size)是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。在第一次扩容的时候,sizeCtl 会被设置成一个很大的负数U.compareAndSwapInt(this,SIZECTL,sc,(rs << RESIZE_STAMP_SHIFT)+2);之后每一个线程扩容的时候,sizeCtl 就加 1,U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待扩容完成之后,sizeCtl 减1。

这里看看 put 方法和扩容方法就行吧,其他方法就不记录了

相关