java-concurrent包


通常所说的concurrent包基本有3个package组成  

  • java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等  
  • java.util.concurrent.atomic:提供所有原子操作的类, 如AtomicInteger, AtomicLong等;  
  • java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等;  

1. java.util.concurrent并发的接口和类

1.1 ConcurrentHashMap

不管是HashMap还是HashTable要想线程安全都要用synchronized,而synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行

  • 删除操作

要将删除节点e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。这是由entry的定义决定的,entry除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次

1.2 ConcurrentLinkedQueue

使用CAS(Compare and Swap)非阻塞算法来取得最大的吞吐量

入队注解代码如下:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    //入队前,创建一个入队节点
    Node n = new Node(e);
    retry:
    //死循环,入队不成功反复入队。
    for (;;) {
        //创建一个指向tail节点的引用
        Node t = tail;
        //p用来表示队列的尾节点,默认情况下等于tail节点。
        Node p = t;
        for (int hops = 0; ; hops++) {
        //获得p节点的下一个节点。
            Node next = succ(p);
        //next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
            if (next != null) {
               //循环了两次及其以上,并且当前节点还是不等于尾节点
                if (hops > HOPS && t != tail)
                    continue retry; 
                p = next;
            } 
            //如果p是尾节点,则设置p节点的next节点为入队节点。
            else if (p.casNext(null, n)) {
              //如果tail节点有大于等于1个next节点,则将入队节点设置成tair节点,更新失败了也
没关系,因为失败了表示有其他线程成功更新了tair节点。
                if (hops >= HOPS)
                    casTail(t, n); // 更新tail节点,允许失败
                return true;  
            } 
           // p有next节点,表示p的next节点是尾节点,则重新设置p节点
            else {
                p = succ(p);
            }
        }
    }
}

出队注解代码如下:

public E poll() {
    Node h = head;
   // p表示头节点,需要出队的节点
    Node p = h;
    for (int hops = 0;; hops++) {
        // 获取p节点的元素
        E item = p.getItem();
        // 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果成功则返回p节点的元素。
        if (item != null && p.casItem(item, null)) {
            if (hops >= HOPS) {
                //将p节点下一个节点设置成head节点
                Node q = p.getNext();
                updateHead(h, (q != null) ? q : p);
            }
            return item;
        }
        // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取p节点的下一个节点 
        Node<> next = succ(p);
        // 如果p的下一个节点也为空,说明这个队列已经空了
        if (next == null) {
          // 更新头节点。
            updateHead(h, p);
            break;
        }
        // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
        p = next;
    }
    return null;
}

1.2. CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point),再全部同时执行。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier

常用方法

public int await() throws ReentrantLock实现原理深入探究

重点是AbstractQueuedSynchronizer抽象类的实现Sync,包括NonfairSync和FairSync,默认使用NonfairSync

lock时设置state和当前独占线程,之后的线程

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

阻塞在acquireQueued

PS:这里涉及了一点公平锁非公平锁的概念,

  竞争lock的线程会排队,新线程进来先判断是不是排在对头,如果是,才查看当前状态;否则,排队等待。体现了先到先服务的精神,而非公平锁,新来的线程先查看状态,满足条件直接取得锁

3.2 ReentrantReadWriteLock

实现了ReadWriteLock接口,最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁

如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁;如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁

写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性;读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁视图获取写入锁而都不释放读取锁时就会发生死锁。

3.3 Condition

无论synchronized关键字,或者是lock上的await/signal等,都只能在一个锁上做同步通知。 
假设有3个线程,要对一个资源做同步,一般只能有一个锁来做同步通知操作,那么通知的时候无法做到精确的通知3个线程中的某一个的,因为你调用了wait()/notify()的时候,具体的调度是jvm决定的,但是有的时候的确需要需要对一个锁做多种不同情况的精确通知, 比如一个缓存,满了和空了是两种不同的情况,可以分别通知取数据的线程和放数据的线程

基本使用如下:  
* Condition是个接口,基本的方法就是await()和signal()方法;  
* Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()  
* 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以  
* 和Object.wait()方法一样,每次调用Condition的await()方法的时候,当前线程就自动释放了对当前锁的拥有权  

示例

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();//第一步实现互斥
     try {
       while (count == items.length)//如果没有往数组放,线程阻塞
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;//如果putptr已经是数组的最后一个,那么putptr置为0,从第一个开始放
       ++count;//放完后,把总数加一
       notEmpty.signal();//通知其他线程可以取了
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
}

参考文章:

1. Java集合---ConcurrentHashMap原理分析

2. 探索 ConcurrentHashMap 高并发性的实现机制

3. 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析

4. Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

5. Java并发编程

6. FutureTask 源码解析

7. ReentrantLock实现原理深入探究

8. 关于Java并发编程的总结和思考

9. 深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理