第四部分-并发编程案例分析3:高性能队列Disruptor
1.为什么要用高性能队列Disruptor
为什么要说Disruptor?java SDK提供了2个有界队列
ArrayBlockQueue,LinkedBlockingQueue,基于ReentrantLock锁实现,在高并发情况下,锁的效率不高,更好的替代品有木有?Dosritpr
2.Disruptor介绍
性能更高的有界队列
Log4j2,Spring Messageing,HBase,Storm都用了Disruptor
为什么性能高?
1.内存分配连续,使用RingBuffer数据结构,数组元素初始化时一次性创建,缓存命中率更高;对象循环引用,避免频繁GC
2.避免伪共享,提升缓存利用率。
3.无锁算法,避免频繁加锁,解锁的性能消耗。
4.支持批量消费,消费者可以无锁消费多个消息
3.Disruptor 使用
- 生产者生产的对象称为Event,使用Disruptor必须定义Event,实例代码中为LongEvent
- 构建Disruptor要指定度列大小(bufferSize),还需要传入EventFactory,示例代码为LongEvent::new
- 消费Event需要通过handleEventsWith()方法注册一个事件处理器,发布Event则需要通过publishEvent()放阿飞
伪代码
//自定义Event
class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
//指定RingBuffer大小,
//必须是2的N次方
int bufferSize = 1024;
//构建Disruptor
Disruptor disruptor
= new Disruptor<>(
LongEvent::new,
bufferSize,
DaemonThreadFactory.INSTANCE);
//注册事件处理器
disruptor.handleEventsWith(
(event, sequence, endOfBatch) ->
System.out.println("E: "+event));
//启动Disruptor
disruptor.start();
//获取RingBuffer
RingBuffer ringBuffer
= disruptor.getRingBuffer();
//生产Event
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++){
bb.putLong(0, l);
//生产者生产消息
ringBuffer.publishEvent(
(event, sequence, buffer) ->
event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FamaMonitorApplication.class)
public class DisruptorTest {
class MessageEvent {
public String message;
}
private static final int BUFFER_SIZE = 1024;
@Test
public void test(){
// 构建disruptor
Disruptor disruptor = new Disruptor(
MessageEvent::new,
BUFFER_SIZE,
DaemonThreadFactory.INSTANCE
);
// 注册消费者,并编写消费逻辑
disruptor.handleEventsWith(((event, sequence, endOfBatch) -> {
Thread.sleep(1000);
System.out.println("Message : " + event.message);
}));
// 启动
disruptor.start();
// 获取队列,然后生产
RingBuffer ringBuffer = disruptor.getRingBuffer();
for (int i = 0; true; i++) {
int finalI = i;
ringBuffer.publishEvent(((event, sequence, args) -> {
event.message = finalI + "";
}));
Thread.yield();
}
}
}
4.Disruptor 提升性能的秘籍1-内存连续分配
ArrayBlockQueue底层使用数组+ReentrantLock锁实现。
RingBuffer本质也是数组,这一点性能没有什么区别,区别在于Disruptor对于RingBuffer基础上做了很多优化,其中重要一项就是内存连续分配
5.程序的局部性原理
一段时间内程序的执行会限定在一个局部范围内
时间局限性:程序的某条指令一旦被执行,不久之后,这条指令可能再次被执行,某条数据被访问,不就之后这条数据可能再次被访问。
空间局限期:某块内存一旦被访问,不久之后这块内存附近的内存也有可能被访问。
6.cpu寄存器,cpu缓存
cpu缓存说的就是cpu的寄存器,cpu从内存中加载数据x时,会将x缓存在寄存器里,实际上cpu缓存x的同时,还缓存了x周围的数据,(根据程序的空间局限期,周围数据也有可能被访问)。程序能够体现局部性原理,就可以更好的利用cpu寄存器,从而提升性能。
cpu寄存器的读取速度和内存读取速度不是一个数量级。
7.ArrayBlockQueue有没有利用局部性原理,提升cpu缓存命中率?
并没有
每次队列中添加元素,创建对象E,这个对象都是由生产者线程创建的,由于创建这些元素的时间是离散不连续的,这些元素的内存地址大概率不连续
8.Disruptor利用空间局限期原理,提升了cpu缓存命中率
RingBuffer内部也是数组,但数组中的元素都是在初始化时一次性创建的,这些元素的内存地址大概率是连续的
for (int i=0; i
9.数组中的元素内存地址连续就能提升性能么?
对,可以
消费线程在消费队列时,遵循空间局限性原则,消费完第一个元素,很快就会消费第二个元素。消费第一个元素E1的时候,cpu会把内存E1后面的数据也加载进cpu寄存器里。如果第二个元素E2和E1的内存地址连续的。E2也会被加进Cache中。消费E2的时候,E2已经在cpu寄存器中了,不需要从内存再加载一遍,大大提升了性能了就
10.生产者生产时,不是往队列中add,而是修改队列中的元素对象
publishEvent()发布Event时,并不创建新Event,而是event.set()修改event
RingBuffer创建的event是可以循环利用的。这样还能避免频繁创建,删除Event导致频繁GC问题
11.避免伪共享,避免缓存不可用也同样重要
伪共享的存在,使得cache失效
cpu内存的缓存是按照缓存行(Cache Line)管理的,缓存行的大小通常是64字节;cpu从内存加载数据x,会同时加载x后面的(64-size(x))的数据。
Ar'rayBlockQueue的内部结构
/** 队列数组 */
final Object[] items;
/** 出队索引 */
int takeIndex;
/** 入队索引 */
int putIndex;
/** 队列中元素总数 */
int count;
cpu从内存加载takeIndex时,会连带将putIndex和count都加载进cache。
线程A运行在cpu1核上,入队操作,入队会修改putIndex,修改putIndex会导致所有核的缓存均实效,此时线程B在cpu2核上执行出队,需要获取takIndex,takeIndex缓存已实效,所以必须重新从内存里再拉取一遍,性能就下来了。。
12.taskIndex缓存为什么会失效?
入队操作不会修改takeIndex,但是takeIndex和putIndex共享一个缓存行,导致出队操作利用不了缓存,这就是伪共享带来的问题,性能并没有带来替身。即使数组中的对象内存地址是连续的。
13.如何避免伪共享带来的缓存不可用情况出现?
每个变量独占一个缓存行,不共享缓存行就可以
putIndex是一个缓存行
takeIndex是另一个缓存行
如何保证putIndex是一个缓存行呢?
在taskIndex的前后各填充56个字节,这样就能保证takeIndex独占一个缓存行
//前:填充56字节
class LhsPadding{
long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding{
volatile long value;
}
//后:填充56字节
class RhsPadding extends Value{
long p9, p10, p11, p12, p13, p14, p15;
}
class Sequence extends RhsPadding{
//省略实现
}
14.综上所述
数组中连续的内存地址对象+排除伪共享情况,
就可以充分的利用cpu的缓存提升性能
以上代码设计上面的改动,说白了就是更极端的压榨硬件cpu缓存的性能
15.无锁算法
ArrayBlockQueue中使用了ReentrantLock锁,高并发情况下,性能不高
而Disruptor中采用无锁算法,没有加锁,解锁的性能消耗影响
看上去有点复杂,目前没看太明白,总之性能要优于使用ReentrantLock锁
基本逻辑是:如果没有足够的空余位置,就出让 CPU 使用权,然后重新计算;反之则用 CAS 设置入队索引
//生产者获取n个写入位置
do {
//cursor类似于入队索引,指的是上次生产到这里
current = cursor.get();
//目标是在生产n个
next = current + n;
//减掉一个循环
long wrapPoint = next - bufferSize;
//获取上一次的最小消费位置
long cachedGatingSequence = gatingSequenceCache.get();
//没有足够的空余位置
if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
//重新计算所有消费者里面的最小值位置
long gatingSequence = Util.getMinimumSequence(
gatingSequences, current);
//仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
if (wrapPoint > gatingSequence){
LockSupport.parkNanos(1);
continue;
}
//从新设置上一次的最小消费位置
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)){
//获取写入位置成功,跳出循环
break;
}
} while (true);
16.jvm对于伪代码共享的支持
@sun.misc.Contended
注解可以轻松避免伪共享(前提是jvm参数-XX:-RestrictContended),以牺牲内存为代价。