非阻塞同步算法与CAS(Compare and Swap)无锁算法


锁(lock)的代价

锁是用来做并发最简单的方式,当然其代价也是最高的。内核态的锁的时候需要操作系统进行一次上下文切换,加锁、释放锁会导致比较多的上下文切换和调度延时,等待锁的线程会被挂起直至锁释放。在上下文切换的时候,cpu之前缓存的指令和数据都将失效,对性能有很大的损失。操作系统对多线程的锁进行判断就像两姐妹在为一个玩具在争吵,然后操作系统就是能决定他们谁能拿到玩具的父母,这是很慢的。用户态的锁虽然避免了这些问题,但是其实它们只是在没有真实的竞争时才有效。

Java在JDK1.5之前都是靠synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有守护变量的锁,都采用独占的方式来访问这些变量,如果出现多个线程同时访问锁,那第一些线线程将被挂起,当线程恢复执行时,必须等待其它线程执行完他们的时间片以后才能被调度执行,在挂起和恢复执行过程中存在着很大的开销。锁还存在着其它一些缺点,当一个线程正在等待锁时,它不能做任何事。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。如果被阻塞的线程优先级高,而持有锁的线程优先级低,将会导致优先级反转(Priority Inversion)。

乐观锁与悲观锁

独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。

volatile的问题

与锁相比,volatile变量是一和更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换和线程调度等操作,但是volatile变量也存在一些局限:不能用于构建原子的复合操作,因此当一个变量依赖旧值时就不能使用volatile变量。(参考:谈谈volatiile)

volatile只能保证变量对各个线程的可见性,但不能保证原子性。为什么?见我的另外一篇文章:《为什么volatile不能保证原子性而Atomic可以?》

Java中的原子操作( atomic operations)

原子操作指的是在一步之内就完成而且不能被中断。原子操作在多线程环境中是线程安全的,无需考虑同步的问题。在java中,下列操作是原子操作:

  • all assignments of primitive types except for long and double
  • all assignments of references
  • all operations of java.concurrent.Atomic* classes
  • all assignments to volatile longs and doubles

问题来了,为什么long型赋值不是原子操作呢?例如:

long foo = 65465498L;

实时上java会分两步写入这个long变量,先写32位,再写后32位。这样就线程不安全了。如果改成下面的就线程安全了:

private volatile long foo;

因为volatile内部已经做了synchronized.

CAS无锁算法

要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA32、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。CAS无锁算法的C实现如下:

int compare_and_swap (int* reg, int oldval, int newval) 
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval) 
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS(乐观锁算法)的基本假设前提

CAS比较与交换的伪代码可以表示为:

do{  
       备份旧数据; 
       基于旧数据构造新数据; 
}while(!CAS( 内存地址,备份的旧数据,新数据 ))  

 

(上图的解释:CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。)

就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。

CAS的开销(CPU Cache Miss problem)

前面说过了,CAS(比较并交换)是CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。但CAS就没有开销了吗?不!有cache miss的情况。这个问题比较复杂,首先需要了解CPU的硬件体系结构:

上图可以看到一个8核CPU计算机系统,每个CPU有cache(CPU内部的高速缓存,寄存器),管芯内还带有一个互联模块,使管芯内的两个核可以互相通信。在图中央的系统互联模块可以让四个管芯相互通信,并且将管芯与主存连接起来。数据以“缓存线”为单位在系统中传输,“缓存线”对应于内存中一个 2 的幂大小的字节块,大小通常为 32 到 256 字节之间。当 CPU 从内存中读取一个变量到它的寄存器中时,必须首先将包含了该变量的缓存线读取到 CPU 高速缓存。同样地,CPU 将寄存器中的一个值存储到内存时,不仅必须将包含了该值的缓存线读到 CPU 高速缓存,还必须确保没有其他 CPU 拥有该缓存线的拷贝。

比如,如果 CPU0 在对一个变量执行“比较并交换”(CAS)操作,而该变量所在的缓存线在 CPU7 的高速缓存中,就会发生以下经过简化的事件序列:

  • CPU0 检查本地高速缓存,没有找到缓存线。
  • 请求被转发到 CPU0 和 CPU1 的互联模块,检查 CPU1 的本地高速缓存,没有找到缓存线。
  • 请求被转发到系统互联模块,检查其他三个管芯,得知缓存线被 CPU6和 CPU7 所在的管芯持有。
  • 请求被转发到 CPU6 和 CPU7 的互联模块,检查这两个 CPU 的高速缓存,在 CPU7 的高速缓存中找到缓存线。
  • CPU7 将缓存线发送给所属的互联模块,并且刷新自己高速缓存中的缓存线。
  • CPU6 和 CPU7 的互联模块将缓存线发送给系统互联模块。
  • 系统互联模块将缓存线发送给 CPU0 和 CPU1 的互联模块。
  • CPU0 和 CPU1 的互联模块将缓存线发送给 CPU0 的高速缓存。
  • CPU0 现在可以对高速缓存中的变量执行 CAS 操作了

以上是刷新不同CPU缓存的开销。最好情况下的 CAS 操作消耗大概 40 纳秒,超过 60 个时钟周期。这里的“最好情况”是指对某一个变量执行 CAS 操作的 CPU 正好是最后一个操作该变量的CPU,所以对应的缓存线已经在 CPU 的高速缓存中了,类似地,最好情况下的锁操作(一个“round trip 对”包括获取锁和随后的释放锁)消耗超过 60 纳秒,超过 100 个时钟周期。这里的“最好情况”意味着用于表示锁的数据结构已经在获取和释放锁的 CPU 所属的高速缓存中了。锁操作比 CAS 操作更加耗时,是因深入理解并行编程
为锁操作的数据结构中需要两个原子操作。缓存未命中消耗大概 140 纳秒,超过 200 个时钟周期。需要在存储新值时查询变量的旧值的 CAS 操作,消耗大概 300 纳秒,超过 500 个时钟周期。想想这个,在执行一次 CAS 操作的时间里,CPU 可以执行 500 条普通指令。这表明了细粒度锁的局限性。

以下是cache miss cas 和lock的性能对比:

JVM对CAS的支持:AtomicInt, AtomicLong.incrementAndGet()

在JDK1.5之前,如果不编写明确的代码就无法执行CAS操作,在JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令,如果处理器/CPU不支持CAS指令,那么JVM将使用自旋锁。因此,值得注意的是,CAS解决方案与平台/编译器紧密相关(比如x86架构下其对应的汇编指令是lock cmpxchg,如果想要64Bit的交换,则应使用lock cmpxchg8b。在.NET中我们可以使用Interlocked.CompareExchange函数)

在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。

Java 1.6中AtomicLong.incrementAndGet()的实现源码为:

1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent.atomic;
   8: import sun.misc.Unsafe;
   9: 
  10: /**
  11:  * A long value that may be updated atomically.  See the
  12:  * {@link java.util.concurrent.atomic} package specification for
  13:  * description of the properties of atomic variables. An
  14:  * AtomicLong is used in applications such as atomically
  15:  * incremented sequence numbers, and cannot be used as a replacement
  16:  * for a {@link java.lang.Long}. However, this class does extend
  17:  * Number to allow uniform access by tools and utilities that
  18:  * deal with numerically-based classes.
  19:  *
  20:  * @since 1.5
  21:  * @author Doug Lea
  22:  */
  23: public class AtomicLong extends Number implements java.io.Serializable {
  24:     private static final long serialVersionUID = 1927816293512124184L;
  25: 
  26:     // setup to use Unsafe.compareAndSwapLong for updates
  27:     private static final Unsafe unsafe = Unsafe.getUnsafe();
  28:     private static final long valueOffset;
  29: 
  30:     /**
  31:      * Records whether the underlying JVM supports lockless
  32:      * CompareAndSet for longs. While the unsafe.CompareAndSetLong
  33:      * method works in either case, some constructions should be
  34:      * handled at Java level to avoid locking user-visible locks.
  35:      */
  36:     static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
  37: 
  38:     /**
  39:      * Returns whether underlying JVM supports lockless CompareAndSet
  40:      * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
  41:      */
  42:     private static native boolean VMSupportsCS8();
  43: 
  44:     static {
  45:       try {
  46:         valueOffset = unsafe.objectFieldOffset
  47:             (AtomicLong.class.getDeclaredField("value"));
  48:       } catch (Exception ex) { throw new Error(ex); }
  49:     }
  50: 
  51:     private volatile long value;
  52: 
  53:     /**
  54:      * Creates a new AtomicLong with the given initial value.
  55:      *
  56:      * @param initialValue the initial value
  57:      */
  58:     public AtomicLong(long initialValue) {
  59:         value = initialValue;
  60:     }
  61: 
  62:     /**
  63:      * Creates a new AtomicLong with initial value 0.
  64:      */
  65:     public AtomicLong() {
  66:     }
  67: 
  68:     /**
  69:      * Gets the current value.
  70:      *
  71:      * @return the current value
  72:      */
  73:     public final long get() {
  74:         return value;
  75:     }
  76: 
  77:     /**
  78:      * Sets to the given value.
  79:      *
  80:      * @param newValue the new value
  81:      */
  82:     public final void set(long newValue) {
  83:         value = newValue;
  84:     }
  85: 
  86:     /**
  87:      * Eventually sets to the given value.
  88:      *
  89:      * @param newValue the new value
  90:      * @since 1.6
  91:      */
  92:     public final void lazySet(long newValue) {
  93:         unsafe.putOrderedLong(this, valueOffset, newValue);
  94:     }
  95: 
  96:     /**
  97:      * Atomically sets to the given value and returns the old value.
  98:      *
  99:      * @param newValue the new value
 100:      * @return the previous value
 101:      */
 102:     public final long getAndSet(long newValue) {
 103:         while (true) {
 104:             long current = get();
 105:             if (compareAndSet(current, newValue))
 106:                 return current;
 107:         }
 108:     }
 109: 
 110:     /**
 111:      * Atomically sets the value to the given updated value
 112:      * if the current value == the expected value.
 113:      *
 114:      * @param expect the expected value
 115:      * @param update the new value
 116:      * @return true if successful. False return indicates that
 117:      * the actual value was not equal to the expected value.
 118:      */
 119:     public final boolean compareAndSet(long expect, long update) {
 120:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 121:     }
 122: 
 123:     /**
 124:      * Atomically sets the value to the given updated value
 125:      * if the current value == the expected value.
 126:      * May fail spuriously and does not provide ordering guarantees,
 127:      * so is only rarely an appropriate alternative to compareAndSet.
 128:      *
 129:      * @param expect the expected value
 130:      * @param update the new value
 131:      * @return true if successful.
 132:      */
 133:     public final boolean weakCompareAndSet(long expect, long update) {
 134:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 135:     }
 136: 
 137:     /**
 138:      * Atomically increments by one the current value.
 139:      *
 140:      * @return the previous value
 141:      */
 142:     public final long getAndIncrement() {
 143:         while (true) {
 144:             long current = get();
 145:             long next = current + 1;
 146:             if (compareAndSet(current, next))
 147:                 return current;
 148:         }
 149:     }
 150: 
 151:     /**
 152:      * Atomically decrements by one the current value.
 153:      *
 154:      * @return the previous value
 155:      */
 156:     public final long getAndDecrement() {
 157:         while (true) {
 158:             long current = get();
 159:             long next = current - 1;
 160:             if (compareAndSet(current, next))
 161:                 return current;
 162:         }
 163:     }
 164: 
 165:     /**
 166:      * Atomically adds the given value to the current value.
 167:      *
 168:      * @param delta the value to add
 169:      * @return the previous value
 170:      */
 171:     public final long getAndAdd(long delta) {
 172:         while (true) {
 173:             long current = get();
 174:             long next = current + delta;
 175:             if (compareAndSet(current, next))
 176:                 return current;
 177:         }
 178:     }
 179: 
 180:     /**
 181:      * Atomically increments by one the current value.
 182:      *
 183:      * @return the updated value
 184:      */
 185:     public final long incrementAndGet() {
 186:         for (;;) {
 187:             long current = get();
 188:             long next = current + 1;
 189:             if (compareAndSet(current, next))
 190:                 return next;
 191:         }
 192:     }
 193: 
 194:     /**
 195:      * Atomically decrements by one the current value.
 196:      *
 197:      * @return the updated value
 198:      */
 199:     public final long decrementAndGet() {
 200:         for (;;) {
 201:             long current = get();
 202:             long next = current - 1;
 203:             if (compareAndSet(current, next))
 204:                 return next;
 205:         }
 206:     }
 207: 
 208:     /**
 209:      * Atomically adds the given value to the current value.
 210:      *
 211:      * @param delta the value to add
 212:      * @return the updated value
 213:      */
 214:     public final long addAndGet(long delta) {
 215:         for (;;) {
 216:             long current = get();
 217:             long next = current + delta;
 218:             if (compareAndSet(current, next))
 219:                 return next;
 220:         }
 221:     }
 222: 
 223:     /**
 224:      * Returns the String representation of the current value.
 225:      * @return the String representation of the current value.
 226:      */
 227:     public String toString() {
 228:         return Long.toString(get());
 229:     }
 230: 
 231: 
 232:     public int intValue() {
 233:     return (int)get();
 234:     }
 235: 
 236:     public long longValue() {
 237:     return (long)get();
 238:     }
 239: 
 240:     public float floatValue() {
 241:     return (float)get();
 242:     }
 243: 
 244:     public double doubleValue() {
 245:     return (double)get();
 246:     }
 247: 
 248: }

由此可见,AtomicLong.incrementAndGet的实现用了乐观锁技术,调用了sun.misc.Unsafe类库里面的 CAS算法,用CPU指令来实现无锁自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的锁效率倍增。

public final int getAndIncrement() {  
        for (;;) {  
            int current = get();  
            int next = current + 1;  
            if (compareAndSet(current, next))  
                return current;  
        }  
}  
  
public final boolean compareAndSet(int expect, int update) {  
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
}  

下面是测试代码:可以看到用AtomicLong.incrementAndGet的性能比用synchronized高出几倍。

package console;

import java.util.concurrent.atomic.AtomicLong;

public class main {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("START -- ");
		calc();
		calcSynchro();
		calcAtomic();
		
		testThreadsSync();
		testThreadsAtomic();
		
		testThreadsSync2();
		testThreadsAtomic2();
		
		System.out.println("-- FINISHED ");
	}

	private static void calc() {
		stopwatch sw = new stopwatch();
		sw.start();

		long val = 0;
		while (val < 10000000L) {
			val++;
		}
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calc() elapsed (ms): " + milSecds);
	}

	private static void calcSynchro() {
		stopwatch sw = new stopwatch();
		sw.start();

		long val = 0;

		while (val < 10000000L) {
			synchronized (main.class) {
				val++;
			}
		}

		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calcSynchro() elapsed (ms): " + milSecds);
	}

	private static void calcAtomic() {
		stopwatch sw = new stopwatch();
		sw.start();

		AtomicLong val = new AtomicLong(0);
		while (val.incrementAndGet() < 10000000L) {

		}
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calcAtomic() elapsed (ms): " + milSecds);

	}
    

	private static void testThreadsSync(){
		
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopSync());
		t1.start();
		
		Thread t2 = new Thread(new LoopSync());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsSync() 1 thread elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsAtomic(){
	
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopAtomic());
		t1.start();
		
		Thread t2 = new Thread(new LoopAtomic());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsAtomic() 1 thread elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsSync2(){
		
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopSync());
		t1.start();
		
		Thread t2 = new Thread(new LoopSync());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsSync() 2 threads elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsAtomic2(){
	
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopAtomic());
		t1.start();
		
		Thread t2 = new Thread(new LoopAtomic());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsAtomic() 2 threads elapsed (ms): " + milSecds);
		
	}
	
	private static class LoopAtomic implements Runnable {
		public void run() {
			AtomicLong val = new AtomicLong(0);
			while (val.incrementAndGet() < 10000000L) {

			}
		}
	}
	private static class LoopSync implements Runnable {
		public void run() {
			long val = 0;

			while (val < 10000000L) {
				synchronized (main.class) {
					val++;
				}
			}
		}
	}
}


public class stopwatch {

	private long startTime = 0;
	private long stopTime = 0;
	private boolean running = false;

	public void start() {
		this.startTime = System.currentTimeMillis();
		this.running = true;
	}

	public void stop() {
		this.stopTime = System.currentTimeMillis();
		this.running = false;
	}

	public long getElapsedTime() {
		long elapsed;
		if (running) {
			elapsed = (System.currentTimeMillis() - startTime);
		} else {
			elapsed = (stopTime - startTime);
		}
		return elapsed;
	}

	public long getElapsedTimeSecs() {
		long elapsed;
		if (running) {
			elapsed = ((System.currentTimeMillis() - startTime) / 1000);
		} else {
			elapsed = ((stopTime - startTime) / 1000);
		}
		return elapsed;
	}

	// sample usage
	// public static void main(String[] args) {
	// StopWatch s = new StopWatch();
	// s.start();
	// //code you want to time goes here
	// s.stop();
	// System.out.println("elapsed time in milliseconds: " +
	// s.getElapsedTime());
	// }
}

CAS的例子:非阻塞堆栈

下面是比非阻塞自增稍微复杂一点的CAS的例子:非阻塞堆栈/ConcurrentStackConcurrentStack 中的 push()pop() 操作在结构上与NonblockingCounter 上相似,只是做的工作有些冒险,希望在 “提交” 工作的时候,底层假设没有失效。push() 方法观察当前最顶的节点,构建一个新节点放在堆栈上,然后,如果最顶端的节点在初始观察之后没有变化,那么就安装新节点。如果 CAS 失败,意味着另一个线程已经修改了堆栈,那么过程就会重新开始。

public class ConcurrentStack {
    AtomicReference> head = new AtomicReference>();
    public void push(E item) {
        Node newHead = new Node(item);
        Node oldHead;
        do {
            oldHead = head.get();
            newHead.next = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node oldHead;
        Node newHead;
        do {
            oldHead = head.get();
            if (oldHead == null) 
                return null;
            newHead = oldHead.next;
        } while (!head.compareAndSet(oldHead,newHead));
        return oldHead.item;
    }
    static class Node {
        final E item;
        Node next;
        public Node(E item) { this.item = item; }
    }
}

在轻度到中度的争用情况下,非阻塞算法的性能会超越阻塞算法,因为 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话肯定是真的,因为没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。

在高度争用的情况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,因为当线程阻塞时,它就会停止争用,耐心地等候轮到自己,从而避免了进一步争用。但是,这么高的争用程度并不常见,因为多数时候,线程会把线程本地的计算与争用共享数据的操作分开,从而给其他线程使用共享数据的机会。

CAS的例子3:非阻塞链表

以上的示例(自增计数器和堆栈)都是非常简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就可以容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,因为修改链表、树或哈希表可能涉及对多个指针的更新。CAS 支持对单一指针的原子性条件更新,但是不支持两个以上的指针。所以,要构建一个非阻塞的链表、树或哈希表,需要找到一种方式,可以用 CAS 更新多个指针,同时不会让数据结构处于不一致的状态。

在链表的尾部插入元素,通常涉及对两个指针的更新:“尾” 指针总是指向列表中的最后一个元素,“下一个” 指针从过去的最后一个元素指向新插入的元素。因为需要更新两个指针,所以需要两个 CAS。在独立的 CAS 中更新两个指针带来了两个需要考虑的潜在问题:如果第一个 CAS 成功,而第二个 CAS 失败,会发生什么?如果其他线程在第一个和第二个 CAS 之间企图访问链表,会发生什么?

对于非复杂数据结构,构建非阻塞算法的 “技巧” 是确保数据结构总处于一致的状态(甚至包括在线程开始修改数据结构和它完成修改之间),还要确保其他线程不仅能够判断出第一个线程已经完成了更新还是处在更新的中途,还能够判断出如果第一个线程走向 AWOL,完成更新还需要什么操作。如果线程发现了处在更新中途的数据结构,它就可以 “帮助” 正在执行更新的线程完成更新,然后再进行自己的操作。当第一个线程回来试图完成自己的更新时,会发现不再需要了,返回即可,因为 CAS 会检测到帮助线程的干预(在这种情况下,是建设性的干预)。

这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。如果线程发现数据结构正处在被其他线程更新的中途,然后就等候其他线程完成更新,那么如果其他线程在操作中途失败,这个线程就可能永远等候下去。即使不出现故障,这种方式也会提供糟糕的性能,因为新到达的线程必须放弃处理器,导致上下文切换,或者等到自己的时间片过期(而这更糟)。

public class LinkedQueue  {
    private static class Node  {
        final E item;
        final AtomicReference> next;
        Node(E item, Node next) {
            this.item = item;
            this.next = new AtomicReference>(next);
        }
    }
    private AtomicReference> head
        = new AtomicReference>(new Node(null, null));
    private AtomicReference> tail = head;
    public boolean put(E item) {
        Node newNode = new Node(item, null);
        while (true) {
            Node curTail = tail.get();
            Node residue = curTail.next.get();
            if (curTail == tail.get()) {
                if (residue == null) /* A */ {
                    if (curTail.next.compareAndSet(null, newNode)) /* C */ {
                        tail.compareAndSet(curTail, newNode) /* D */ ;
                        return true;
                    }
                } else {
                    tail.compareAndSet(curTail, residue) /* B */;
                }
            }
        }
    }
}

具体算法相见IBM Developerworks

Java的ConcurrentHashMap的实现原理

Java5中的ConcurrentHashMap,线程安全,设计巧妙,用桶粒度的锁,避免了put和get中对整个map的锁定,尤其在get中,只对一个HashEntry做锁定操作,性能提升是显而易见的。

具体实现中使用了锁分离机制,在这个帖子中有非常详细的讨论。这里有关于Java内存模型结合ConcurrentHashMap的分析。以下是JDK6的ConcurrentHashMap的源码:

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
import java.io.Serializable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamField;

/**
 * A hash table supporting full concurrency of retrievals and
 * adjustable expected concurrency for updates. This class obeys the
 * same functional specification as {@link java.util.Hashtable}, and
 * includes versions of methods corresponding to each method of
 * Hashtable. However, even though all operations are
 * thread-safe, retrieval operations do not entail locking,
 * and there is not any support for locking the entire table
 * in a way that prevents all access.  This class is fully
 * interoperable with Hashtable in programs that rely on its
 * thread safety but not on its synchronization details.
 *
 * 

Retrieval operations (including get) generally do not * block, so may overlap with update operations (including * put and remove). Retrievals reflect the results * of the most recently completed update operations holding * upon their onset. For aggregate operations such as putAll * and clear, concurrent retrievals may reflect insertion or * removal of only some entries. Similarly, Iterators and * Enumerations return elements reflecting the state of the hash table * at some point at or since the creation of the iterator/enumeration. * They do not throw {@link ConcurrentModificationException}. * However, iterators are designed to be used by only one thread at a time. * *

The allowed concurrency among update operations is guided by * the optional concurrencyLevel constructor argument * (default 16), which is used as a hint for internal sizing. The * table is internally partitioned to try to permit the indicated * number of concurrent updates without contention. Because placement * in hash tables is essentially random, the actual concurrency will * vary. Ideally, you should choose a value to accommodate as many * threads as will ever concurrently modify the table. Using a * significantly higher value than you need can waste space and time, * and a significantly lower value can lead to thread contention. But * overestimates and underestimates within an order of magnitude do * not usually have much noticeable impact. A value of one is * appropriate when it is known that only one thread will modify and * all others will only read. Also, resizing this or any other kind of * hash table is a relatively slow operation, so, when possible, it is * a good idea to provide estimates of expected table sizes in * constructors. * *

This class and its views and iterators implement all of the * optional methods of the {@link Map} and {@link Iterator} * interfaces. * *

Like {@link Hashtable} but unlike {@link HashMap}, this class * does not allow null to be used as a key or value. * *

This class is a member of the * * Java Collections Framework. * * @since 1.5 * @author Doug Lea * @param the type of keys maintained by this map * @param the type of mapped values */ public class ConcurrentHashMap extends AbstractMap implements ConcurrentMap, Serializable { private static final long serialVersionUID = 7249069246763182397L; /* * The basic strategy is to subdivide the table among Segments, * each of which itself is a concurrently readable hash table. To * reduce footprint, all but one segments are constructed only * when first needed (see ensureSegment). To maintain visibility * in the presence of lazy construction, accesses to segments as * well as elements of segment's table must use volatile access, * which is done via Unsafe within methods segmentAt etc * below. These provide the functionality of AtomicReferenceArrays * but reduce the levels of indirection. Additionally, * volatile-writes of table elements and entry "next" fields * within locked operations use the cheaper "lazySet" forms of * writes (via putOrderedObject) because these writes are always * followed by lock releases that maintain sequential consistency * of table updates. * * Historical note: The previous version of this class relied * heavily on "final" fields, which avoided some volatile reads at * the expense of a large initial footprint. Some remnants of * that design (including forced construction of segment 0) exist * to ensure serialization compatibility. */ /* ---------------- Constants -------------- */ /** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * The default load factor for this table, used when not * otherwise specified in a constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * The maximum capacity, used if a higher value is implicitly * specified by either of the constructors with arguments. MUST * be a power of two <= 1<<30 to ensure that entries are indexable * using ints. */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. */ static final int MIN_SEGMENT_TABLE_CAPACITY = 2; /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. */ static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static final int RETRIES_BEFORE_LOCK = 2; /* ---------------- Fields -------------- */ /** * Mask value for indexing into segments. The upper bits of a * key's hash code are used to choose the segment. */ final int segmentMask; /** * Shift value for indexing within segments. */ final int segmentShift; /** * The segments, each of which is a specialized hash table. */ final Segment[] segments; transient Set keySet; transient Set> entrySet; transient Collection values; /** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */ static final class HashEntry { final int hash; final K key; volatile V value; volatile HashEntry next; HashEntry(int hash, K key, V value, HashEntry next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** * Gets the ith element of given table (if nonnull) with volatile * read semantics. */ @SuppressWarnings("unchecked") static final HashEntry entryAt(HashEntry[] tab, int i) { return (tab == null) ? null : (HashEntry) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } /** * Sets the ith element of given table, with volatile write * semantics. (See above about use of putOrderedObject.) */ static final void setEntryAt(HashEntry[] tab, int i, HashEntry e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); } /** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } /** * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ static final class Segment extends ReentrantLock implements Serializable { /* * Segments maintain a table of entry lists that are always * kept in a consistent state, so can be read (via volatile * reads of segments and tables) without locking. This * requires replicating nodes when necessary during table * resizing, so the old lists can be traversed by readers * still using old version of table. * * This class defines only mutative methods requiring locking. * Except as noted, the methods of this class perform the * per-segment versions of ConcurrentHashMap methods. (Other * methods are integrated directly into ConcurrentHashMap * methods.) These mutative methods use a form of controlled * spinning on contention via methods scanAndLock and * scanAndLockForPut. These intersperse tryLocks with * traversals to locate nodes. The main benefit is to absorb * cache misses (which are very common for hash tables) while * obtaining locks so that traversal is faster once * acquired. We do not actually use the found nodes since they * must be re-acquired under lock anyway to ensure sequential * consistency of updates (and in any case may be undetectably * stale), but they will normally be much faster to re-locate. * Also, scanAndLockForPut speculatively creates a fresh node * to use in put if no node is found. */ private static final long serialVersionUID = 2249069246763182397L; /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient volatile HashEntry[] table; /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient int count; /** * The total number of mutative operations in this segment. * Even though this may overflows 32 bits, it provides * sufficient accuracy for stability checks in CHM isEmpty() * and size() methods. Accessed only either within locks or * among other volatile reads that maintain visibility. */ transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always (int)(capacity * * loadFactor).) */ transient int threshold; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; Segment(float lf, int threshold, HashEntry[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry[] tab = table; int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry(hash, key, value, first); int c = count + 1; if (c > threshold && first != null && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } /** * Doubles size of table and repacks entries, also adding the * given node to new table */ @SuppressWarnings("unchecked") private void rehash(HashEntry node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry[] newTable = (HashEntry[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } /** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found. Upon * return, guarantees that lock is held. UNlike in most * methods, calls to method equals are not screened: Since * traversal speed doesn't matter, we might as well help warm * up the associated code and accesses as well. * * @return a new node if key not found, else null */ private HashEntry scanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } /** * Scans for a node containing the given key while trying to * acquire lock for a remove or replace operation. Upon * return, guarantees that lock is held. Note that we must * lock even if the key is not found, to ensure sequential * consistency of updates. */ private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry first = entryForHash(this, hash); HashEntry e = first; int retries = -1; while (!tryLock()) { HashEntry f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } } /** * Remove; match on key only if value null, else match both. */ final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry[] tab = table; int index = (tab.length - 1) & hash; HashEntry e = entryAt(tab, index); HashEntry pred = null; while (e != null) { K k; HashEntry next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; } final boolean replace(K key, int hash, V oldValue, V newValue) { if (!tryLock()) scanAndLock(key, hash); boolean replaced = false; try { HashEntry e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { if (oldValue.equals(e.value)) { e.value = newValue; ++modCount; replaced = true; } break; } } } finally { unlock(); } return replaced; } final V replace(K key, int hash, V value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break; } } } finally { unlock(); } return oldValue; } final void clear() { lock(); try { HashEntry[] tab = table; for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null); ++modCount; count = 0; } finally { unlock(); } } } // Accessing segments /** * Gets the jth element of given segment array (if nonnull) with * volatile element access semantics via Unsafe. */ @SuppressWarnings("unchecked") static final Segment segmentAt(Segment[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment) UNSAFE.getObjectVolatile(ss, u); } /** * Returns the segment for the given index, creating it and * recording in segment table (via CAS) if not already present. * * @param k the index * @return the segment */ @SuppressWarnings("unchecked") private Segment ensureSegment(int k) { final Segment[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment seg; if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry[] tab = (HashEntry[])new HashEntry[cap]; if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment s = new Segment(lf, threshold, tab); while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } // Hash-based segment and entry accesses /** * Get the segment for the given hash */ @SuppressWarnings("unchecked") private Segment segmentForHash(int h) { long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return (Segment) UNSAFE.getObjectVolatile(segments, u); } /** * Gets the table entry for the given segment and hash */ @SuppressWarnings("unchecked") static final HashEntry entryForHash(Segment seg, int h) { HashEntry[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); } /* ---------------- Public operations -------------- */ /** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @param concurrencyLevel the estimated number of concurrently * updating threads. The implementation performs internal sizing * to try to accommodate this many threads. * @throws IllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive. */ @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment s0 = new Segment(loadFactor, (int)(cap * loadFactor), (HashEntry[])new HashEntry[cap]); Segment[] ss = (Segment[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } /** * Creates a new, empty map with the specified initial capacity * and load factor and with the default concurrencyLevel (16). * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @throws IllegalArgumentException if the initial capacity of * elements is negative or the load factor is nonpositive * * @since 1.6 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial capacity, * and with default load factor (0.75) and concurrencyLevel (16). * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative. */ public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencyLevel (16). */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new map with the same mappings as the given map. * The map is created with a capacity of 1.5 times the number * of mappings in the given map or 16 (whichever is greater), * and a default load factor (0.75) and concurrencyLevel (16). * * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); } /** * Returns true if this map contains no key-value mappings. * * @return true if this map contains no key-value mappings */ public boolean isEmpty() { /* * Sum per-segment modCounts to avoid mis-reporting when * elements are concurrently added and removed in one segment * while checking another, in which case the table was never * actually empty at any point. (The sum ensures accuracy up * through at least 1<<31 per-segment modifications before * recheck.) Methods size() and containsValue() use similar * constructions for stability checks. */ long sum = 0L; final Segment[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) return false; } return true; } /** * Returns the number of key-value mappings in this map. If the * map contains more than Integer.MAX_VALUE elements, returns * Integer.MAX_VALUE. * * @return the number of key-value mappings in this map */ public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; } /** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * *

More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public V get(Object key) { int hash = hash(key.hashCode()); for (HashEntry e = entryForHash(segmentForHash(hash), hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) return e.value; } return null; } /** * Tests if the specified object is a key in this table. * * @param key possible key * @return true if and only if the specified object * is a key in this table, as determined by the * equals method; false otherwise. * @throws NullPointerException if the specified key is null */ public boolean containsKey(Object key) { int hash = hash(key.hashCode()); for (HashEntry e = entryForHash(segmentForHash(hash), hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) return true; } return false; } /** * Returns true if this map maps one or more keys to the * specified value. Note: This method requires a full internal * traversal of the hash table, and so is much slower than * method containsKey. * * @param value value whose presence in this map is to be tested * @return true if this map maps one or more keys to the * specified value * @throws NullPointerException if the specified value is null */ public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; for (int j = 0; j < segments.length; ++j) { HashEntry[] tab; Segment seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry e; for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; } /** * Legacy method testing if some key maps into the specified value * in this table. This method is identical in functionality to * {@link #containsValue}, and exists solely to ensure * full compatibility with class {@link java.util.Hashtable}, * which supported this method prior to introduction of the * Java Collections framework. * @param value a value to search for * @return true if and only if some key maps to the * value argument in this table as * determined by the equals method; * false otherwise * @throws NullPointerException if the specified value is null */ public boolean contains(Object value) { return containsValue(value); } /** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * *

The value can be retrieved by calling the get method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with key, or * null if there was no mapping for key * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; Segment s = segmentAt(segments, j); if (s == null) s = ensureSegment(j); return s.put(key, hash, value, false); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or null if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public V putIfAbsent(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; Segment s = segmentAt(segments, j); if (s == null) s = ensureSegment(j); return s.put(key, hash, value, true); } /** * Copies all of the mappings from the specified map to this one. * These mappings replace any mappings that this map had for any of the * keys currently in the specified map. * * @param m mappings to be stored in this map */ public void putAll(Map<? extends K, ? extends V> m) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) put(e.getKey(), e.getValue()); } /** * Removes the key (and its corresponding value) from this map. * This method does nothing if the key is not in the map. * * @param key the key that needs to be removed * @return the previous value associated with key, or * null if there was no mapping for key * @throws NullPointerException if the specified key is null */ public V remove(Object key) { int hash = hash(key.hashCode()); Segment s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } /** * {@inheritDoc} * * @throws NullPointerException if the specified key is null */ public boolean remove(Object key, Object value) { int hash = hash(key.hashCode()); Segment s; return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null; } /** * {@inheritDoc} * * @throws NullPointerException if any of the arguments are null */ public boolean replace(K key, V oldValue, V newValue) { int hash = hash(key.hashCode()); if (oldValue == null || newValue == null) throw new NullPointerException(); Segment s = segmentForHash(hash); return s != null && s.replace(key, hash, oldValue, newValue); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or null if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public V replace(K key, V value) { int hash = hash(key.hashCode()); if (value == null) throw new NullPointerException(); Segment s = segmentForHash(hash); return s == null ? null : s.replace(key, hash, value); } /** * Removes all of the mappings from this map. */ public void clear() { final Segment[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment s = segmentAt(segments, j); if (s != null) s.clear(); } } /** * Returns a {@link Set} view of the keys contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from this map, * via the Iterator.remove, Set.remove, * removeAll, retainAll, and clear * operations. It does not support the add or * addAll operations. * *

The view's iterator is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Set keySet() { Set ks = keySet; return (ks != null) ? ks : (keySet = new KeySet()); } /** * Returns a {@link Collection} view of the values contained in this map. * The collection is backed by the map, so changes to the map are * reflected in the collection, and vice-versa. The collection * supports element removal, which removes the corresponding * mapping from this map, via the Iterator.remove, * Collection.remove, removeAll, * retainAll, and clear operations. It does not * support the add or addAll operations. * *

The view's iterator is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Collection values() { Collection vs = values; return (vs != null) ? vs : (values = new Values()); } /** * Returns a {@link Set} view of the mappings contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from the map, * via the Iterator.remove, Set.remove, * removeAll, retainAll, and clear * operations. It does not support the add or * addAll operations. * *

The view's iterator is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public Set> entrySet() { Set> es = entrySet; return (es != null) ? es : (entrySet = new EntrySet()); } /** * Returns an enumeration of the keys in this table. * * @return an enumeration of the keys in this table * @see #keySet() */ public Enumeration keys() { return new KeyIterator(); } /** * Returns an enumeration of the values in this table. * * @return an enumeration of the values in this table * @see #values() */ public Enumeration elements() { return new ValueIterator(); } /* ---------------- Iterator Support -------------- */ abstract class HashIterator { int nextSegmentIndex; int nextTableIndex; HashEntry[] currentTable; HashEntry nextEntry; HashEntry lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1; nextTableIndex = -1; advance(); } /** * Set nextEntry to first node of next non-empty table * (in backwards order, to simplify checks). */ final void advance() { for (;;) { if (nextTableIndex >= 0) { if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null) break; } else if (nextSegmentIndex >= 0) { Segment seg = segmentAt(segments, nextSegmentIndex--); if (seg != null && (currentTable = seg.table) != null) nextTableIndex = currentTable.length - 1; } else break; } } final HashEntry nextEntry() { HashEntry e = nextEntry; if (e == null) throw new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if ((nextEntry = e.next) == null) advance(); return e; } public final boolean hasNext() { return nextEntry != null; } public final boolean hasMoreElements() { return nextEntry != null; } public final void remove() { if (lastReturned == null) throw new IllegalStateException(); ConcurrentHashMap.this.remove(lastReturned.key); lastReturned = null; } } final class KeyIterator extends HashIterator implements Iterator, Enumeration { public final K next() { return super.nextEntry().key; } public final K nextElement() { return super.nextEntry().key; } } final class ValueIterator extends HashIterator implements Iterator, Enumeration { public final V next() { return super.nextEntry().value; } public final V nextElement() { return super.nextEntry().value; } } /** * Custom Entry class used by EntryIterator.next(), that relays * setValue changes to the underlying map. */ final class WriteThroughEntry extends AbstractMap.SimpleEntry { WriteThroughEntry(K k, V v) { super(k,v); } /** * Set our entry's value and write through to the map. The * value to return is somewhat arbitrary here. Since a * WriteThroughEntry does not necessarily track asynchronous * changes, the most recent "previous" value could be * different from what we return (or could even have been * removed in which case the put will re-establish). We do not * and cannot guarantee more. */ public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; } } final class EntryIterator extends HashIterator implements Iterator> { public Map.Entry next() { HashEntry e = super.nextEntry(); return new WriteThroughEntry(e.key, e.value); } } final class KeySet extends AbstractSet { public Iterator iterator() { return new KeyIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsKey(o); } public boolean remove(Object o) { return ConcurrentHashMap.this.remove(o) != null; } public void clear() { ConcurrentHashMap.this.clear(); } } final class Values extends AbstractCollection { public Iterator iterator() { return new ValueIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsValue(o); } public void clear() { ConcurrentHashMap.this.clear(); } } final class EntrySet extends AbstractSet> { public Iterator> iterator() { return new EntryIterator(); } public boolean contains(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; V v = ConcurrentHashMap.this.get(e.getKey()); return v != null && v.equals(e.getValue()); } public boolean remove(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; return ConcurrentHashMap.this.remove(e.getKey(), e.getValue()); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public void clear() { ConcurrentHashMap.this.clear(); } } /* ---------------- Serialization Support -------------- */ /** * Save the state of the ConcurrentHashMap instance to a * stream (i.e., serialize it). * @param s the stream * @serialData * the key (Object) and value (Object) * for each key-value mapping, followed by a null pair. * The key-value mappings are emitted in no particular order. */ private void writeObject(java.io.ObjectOutputStream s) throws IOException { // force all segments for serialization compatibility for (int k = 0; k < segments.length; ++k) ensureSegment(k); s.defaultWriteObject(); final Segment[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment seg = segmentAt(segments, k); seg.lock(); try { HashEntry[] tab = seg.table; for (int i = 0; i < tab.length; ++i) { HashEntry e; for (e = entryAt(tab, i); e != null; e = e.next) { s.writeObject(e.key); s.writeObject(e.value); } } } finally { seg.unlock(); } } s.writeObject(null); s.writeObject(null); } /** * Reconstitute the ConcurrentHashMap instance from a * stream (i.e., deserialize it). * @param s the stream */ @SuppressWarnings("unchecked") private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { // Don't call defaultReadObject() ObjectInputStream.GetField oisFields = s.readFields(); final Segment[] oisSegments = (Segment[])oisFields.get("segments", null); final int ssize = oisSegments.length; if (ssize < 1 || ssize > MAX_SEGMENTS || (ssize & (ssize-1)) != 0 ) // ssize not power of two throw new java.io.InvalidObjectException("Bad number of segments:" + ssize); int sshift = 0, ssizeTmp = ssize; while (ssizeTmp > 1) { ++sshift; ssizeTmp >>>= 1; } UNSAFE.putIntVolatile(this, SEGSHIFT_OFFSET, 32 - sshift); UNSAFE.putIntVolatile(this, SEGMASK_OFFSET, ssize - 1); UNSAFE.putObjectVolatile(this, SEGMENTS_OFFSET, oisSegments); // Re-initialize segments to be minimally sized, and let grow. int cap = MIN_SEGMENT_TABLE_CAPACITY; final Segment[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment seg = segments[k]; if (seg != null) { seg.threshold = (int)(cap * seg.loadFactor); seg.table = (HashEntry[]) new HashEntry[cap]; } } // Read the keys and values, and put the mappings in the table for (;;) { K key = (K) s.readObject(); V value = (V) s.readObject(); if (key == null) break; put(key, value); } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long SBASE; private static final int SSHIFT; private static final long TBASE; private static final int TSHIFT; private static final long SEGSHIFT_OFFSET; private static final long SEGMASK_OFFSET; private static final long SEGMENTS_OFFSET; static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); } }

Java的ConcurrentLinkedQueue实现方法

ConcurrentLinkedQueue也是同样使用了CAS指令,但其性能并不高因为太多CAS操作。其源码如下:

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package java.util.concurrent;
import java.util.*;
import java.util.concurrent.atomic.*;


/**
 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
 * This queue orders elements FIFO (first-in-first-out).
 * The head of the queue is that element that has been on the
 * queue the longest time.
 * The tail of the queue is that element that has been on the
 * queue the shortest time. New elements
 * are inserted at the tail of the queue, and the queue retrieval
 * operations obtain elements at the head of the queue.
 * A ConcurrentLinkedQueue is an appropriate choice when
 * many threads will share access to a common collection.
 * This queue does not permit null elements.
 *
 * 

This implementation employs an efficient "wait-free" * algorithm based on one described in Simple, * Fast, and Practical Non-Blocking and Blocking Concurrent Queue * Algorithms by Maged M. Michael and Michael L. Scott. * *

Beware that, unlike in most collections, the size method * is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements. * *

This class and its iterator implement all of the * optional methods of the {@link Collection} and {@link * Iterator} interfaces. * *

Memory consistency effects: As with other concurrent * collections, actions in a thread prior to placing an object into a * {@code ConcurrentLinkedQueue} * happen-before * actions subsequent to the access or removal of that element from * the {@code ConcurrentLinkedQueue} in another thread. * *

This class is a member of the * * Java Collections Framework. * * @since 1.5 * @author Doug Lea * @param the type of elements held in this collection * */ public class ConcurrentLinkedQueue extends AbstractQueue implements Queue, java.io.Serializable { private static final long serialVersionUID = 196745693267521676L; /* * This is a straight adaptation of Michael & Scott algorithm. * For explanation, read the paper. The only (minor) algorithmic * difference is that this version supports lazy deletion of * internal nodes (method remove(Object)) -- remove CAS'es item * fields to null. The normal queue operations unlink but then * pass over nodes with null item fields. Similarly, iteration * methods ignore those with nulls. * * Also note that like most non-blocking algorithms in this * package, this implementation relies on the fact that in garbage * collected systems, there is no possibility of ABA problems due * to recycled nodes, so there is no need to use "counted * pointers" or related techniques seen in versions used in * non-GC'ed settings. */ private static class Node { private volatile E item; private volatile Node next; private static final AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater (Node.class, Node.class, "next"); private static final AtomicReferenceFieldUpdater itemUpdater = AtomicReferenceFieldUpdater.newUpdater (Node.class, Object.class, "item"); Node(E x) { item = x; } Node(E x, Node n) { item = x; next = n; } E getItem() { return item; } boolean casItem(E cmp, E val) { return itemUpdater.compareAndSet(this, cmp, val); } void setItem(E val) { itemUpdater.set(this, val); } Node getNext() { return next; } boolean casNext(Node cmp, Node val) { return nextUpdater.compareAndSet(this, cmp, val); } void setNext(Node val) { nextUpdater.set(this, val); } } private static final AtomicReferenceFieldUpdater tailUpdater = AtomicReferenceFieldUpdater.newUpdater (ConcurrentLinkedQueue.class, Node.class, "tail"); private static final AtomicReferenceFieldUpdater headUpdater = AtomicReferenceFieldUpdater.newUpdater (ConcurrentLinkedQueue.class, Node.class, "head"); private boolean casTail(Node cmp, Node val) { return tailUpdater.compareAndSet(this, cmp, val); } private boolean casHead(Node cmp, Node val) { return headUpdater.compareAndSet(this, cmp, val); } /** * Pointer to header node, initialized to a dummy node. The first * actual node is at head.getNext(). */ private transient volatile Node head = new Node(null, null); /** Pointer to last node on list **/ private transient volatile Node tail = head; /** * Creates a ConcurrentLinkedQueue that is initially empty. */ public ConcurrentLinkedQueue() {} /** * Creates a ConcurrentLinkedQueue * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public ConcurrentLinkedQueue(Collection<? extends E> c) { for (Iterator<? extends E> it = c.iterator(); it.hasNext();) add(it.next()); } // Have to override just to update the javadoc /** * Inserts the specified element at the tail of this queue. * * @return true (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element at the tail of this queue. * * @return true (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node n = new Node(e, null); for (;;) { Node t = tail; Node s = t.getNext(); if (t == tail) { if (s == null) { if (t.casNext(s, n)) { casTail(t, n); return true; } } else { casTail(t, s); } } } } public E poll() { for (;;) { Node h = head; Node t = tail; Node first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else if (casHead(h, first)) { E item = first.getItem(); if (item != null) { first.setItem(null); return item; } // else skip over deleted item, continue loop, } } } } public E peek() { // same as poll except don't remove item for (;;) { Node h = head; Node t = tail; Node first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else { E item = first.getItem(); if (item != null) return item; else // remove deleted node and continue casHead(h, first); } } } } /** * Returns the first actual (non-header) node on list. This is yet * another variant of poll/peek; here returning out the first * node, not element (so we cannot collapse with peek() without * introducing race.) */ Node first() { for (;;) { Node h = head; Node t = tail; Node first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else { if (first.getItem() != null) return first; else // remove deleted node and continue casHead(h, first); } } } } /** * Returns true if this queue contains no elements. * * @return true if this queue contains no elements */ public boolean isEmpty() { return first() == null; } /** * Returns the number of elements in this queue. If this queue * contains more than Integer.MAX_VALUE elements, returns * Integer.MAX_VALUE. * *

Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current * number of elements requires an O(n) traversal. * * @return the number of elements in this queue */ public int size() { int count = 0; for (Node p = first(); p != null; p = p.getNext()) { if (p.getItem() != null) { // Collections.size() spec says to max out if (++count == Integer.MAX_VALUE) break; } } return count; } /** * Returns true if this queue contains the specified element. * More formally, returns true if and only if this queue contains * at least one element e such that o.equals(e). * * @param o object to be checked for containment in this queue * @return true if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; for (Node p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null && o.equals(item)) return true; } return false; } /** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element e such * that o.equals(e), if this queue contains one or more such * elements. * Returns true if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present * @return true if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; for (Node p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null && o.equals(item) && p.casItem(item, null)) return true; } return false; } /** * Returns an array containing all of the elements in this queue, in * proper sequence. * *

The returned array will be "safe" in that no references to it are * maintained by this queue. (In other words, this method must allocate * a new array). The caller is thus free to modify the returned array. * *

This method acts as bridge between array-based and collection-based * APIs. * * @return an array containing all of the elements in this queue */ public Object[] toArray() { // Use ArrayList to deal with resizing. ArrayList al = new ArrayList(); for (Node p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null) al.add(item); } return al.toArray(); } /** * Returns an array containing all of the elements in this queue, in * proper sequence; the runtime type of the returned array is that of * the specified array. If the queue fits in the specified array, it * is returned therein. Otherwise, a new array is allocated with the * runtime type of the specified array and the size of this queue. * *

If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * null. * *

Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * *

Suppose x is a queue known to contain only strings. * The following code can be used to dump the queue into a newly * allocated array of String: * *

     *     String[] y = x.toArray(new String[0]);
* * Note that toArray(new Object[0]) is identical in function to * toArray(). * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */ public T[] toArray(T[] a) { // try to use sent-in array int k = 0; Node p; for (p = first(); p != null && k < a.length; p = p.getNext()) { E item = p.getItem(); if (item != null) a[k++] = (T)item; } if (p == null) { if (k < a.length) a[k] = null; return a; } // If won't fit, use ArrayList version ArrayList al = new ArrayList(); for (Node q = first(); q != null; q = q.getNext()) { E item = q.getItem(); if (item != null) al.add(item); } return al.toArray(a); } /** * Returns an iterator over the elements in this queue in proper sequence. * The returned iterator is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ public Iterator iterator() { return new Itr(); } private class Itr implements Iterator { /** * Next node to return item for. */ private Node nextNode; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. */ private E nextItem; /** * Node of the last returned item, to support remove. */ private Node lastRet; Itr() { advance(); } /** * Moves to next valid node and returns item to return for * next(), or null if no such. */ private E advance() { lastRet = nextNode; E x = nextItem; Node p = (nextNode == null)? first() : nextNode.getNext(); for (;;) { if (p == null) { nextNode = null; nextItem = null; return x; } E item = p.getItem(); if (item != null) { nextNode = p; nextItem = item; return x; } else // skip over nulls p = p.getNext(); } } public boolean hasNext() { return nextNode != null; } public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } public void remove() { Node l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.setItem(null); lastRet = null; } } /** * Save the state to a stream (that is, serialize it). * * @serialData All of the elements (each an E) in * the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { // Write out any hidden stuff s.defaultWriteObject(); // Write out all elements in the proper order. for (Node p = first(); p != null; p = p.getNext()) { Object item = p.getItem(); if (item != null) s.writeObject(item); } // Use trailing null as sentinel s.writeObject(null); } /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject(); head = new Node(null, null); tail = head; // Read in all elements and place in queue for (;;) { E item = (E)s.readObject(); if (item == null) break; else offer(item); } } }

高并发环境下优化锁或无锁(lock-free)的设计思路

服务端编程的3大性能杀手:1、大量线程导致的线程切换开销。2、锁。3、非必要的内存拷贝。在高并发下,对于纯内存操作来说,单线程是要比多线程快的, 可以比较一下多线程程序在压力测试下cpu的sy和ni百分比。高并发环境下要实现高吞吐量和线程安全,两个思路:一个是用优化的锁实现,一个是lock-free的无锁结构。但非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是相当专业的训练,而且要证明算法的正确也极为困难,不仅和具体的目标机器平台和编译器相关,而且需要复杂的技巧和严格的测试。虽然Lock-Free编程非常困难,但是它通常可以带来比基于锁编程更高的吞吐量。所以Lock-Free编程是大有前途的技术。它在线程中止、优先级倒置以及信号安全等方面都有着良好的表现。

  • 优化锁实现的例子:Java中的ConcurrentHashMap,设计巧妙,用桶粒度的锁和锁分离机制,避免了put和get中对整个map的锁定,尤其在get中,只对一个HashEntry做锁定操作,性能提升是显而易见的(详细分析见《探索 ConcurrentHashMap 高并发性的实现机制》)。
  • Lock-free无锁的例子:CAS(CPU的Compare-And-Swap指令)的利用和LMAX的disruptor无锁消息队列数据结构等。有兴趣了解LMAX的disruptor无锁消息队列数据结构的可以移步slideshare。

disruptor无锁消息队列数据结构的类图和技术文档下载

另外,在设计思路上除了尽量减少资源争用以外,还可以借鉴nginx/node.js等单线程大循环的机制,用单线程或CPU数相同的线程开辟大的队列,并发的时候任务压入队列,线程轮询然后一个个顺序执行。由于每个都采用异步I/O,没有阻塞线程。这个大队列可以使用RabbitMQueue,或是JDK的同步队列(性能稍差),或是使用Disruptor无锁队列(Java)。任务处理可以全部放在内存(多级缓存、读写分离、ConcurrentHashMap、甚至分布式缓存Redis)中进行增删改查。最后用Quarz维护定时把缓存数据同步到DB中。当然,这只是中小型系统的思路,如果是大型分布式系统会非常复杂,需要分而治理,用SOA的思路,参考这篇文章的图。(注:Redis是单线程的纯内存数据库,单线程无需锁,而Memcache是多线程的带CAS算法,两者都使用epoll,no-blocking io)

深入JVM的OS的无锁非阻塞算法

如果深入 JVM 和操作系统,会发现非阻塞算法无处不在。垃圾收集器使用非阻塞算法加快并发和平行的垃圾搜集;调度器使用非阻塞算法有效地调度线程和进程,实现内在锁。在 Mustang(Java 6.0)中,基于锁的 SynchronousQueue 算法被新的非阻塞版本代替。很少有开发人员会直接使用 SynchronousQueue,但是通过 Executors.newCachedThreadPool() 工厂构建的线程池用它作为工作队列。比较缓存线程池性能的对比测试显示,新的非阻塞同步队列实现提供了几乎是当前实现 3 倍的速度。在 Mustang 的后续版本(代码名称为 Dolphin)中,已经规划了进一步的改进。

参考文献

IBM developerworks: Java theory and practice: Going atomic