前言
以前一直听说有Disruptor这个东西,都是性能很强大,所以这几天自己也看了一下。 下面是自己的学习笔记,另外推荐几篇自己看到写的比较好的博客: Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案 Disruptor3.0的实现细节
DIsruptor我和底层性能如此牛掰
- 数据结构层面:使用环形结构、数组、内存预加载
- 单线程写方式、内存屏障
- 消除伪共享(填充缓存行)
- 序号栅栏(
SequenceBarrier
)配合使用来消除锁和CAS
高性能知道-数据结构-内存加载机制
- RingBuffer使用数组Object[] entries作为存储元素,如下图所示
高性能之道-内核-使用单线程写
- Disruptor的RingBuffer,之所以可以做到完全无锁,也是因为 ”单线程个写“, 这是所有”前提的前提“。离开了这个前提条件,没有任何技术可以做到完全无锁
- Redis、Netty等等高性能技术框架的设计都是这个核心思想
高性能之道-系统内存优化-内存屏障
- 要正确的实现无锁,还需要另一个关键技术:内存屏障。
- 对应到Java语言,就是valotile变量与happens before语义。
- 内存屏障-Linux的smp_wmb()/smp_rmb()
高性能之道-系统缓存优化-消除伪共享
- 缓存系统中是以缓存行(cache line)为单位存储的
- 缓存行是2的整数幂个连续字节,一般为32-256个字节
- 最常见的缓存行大小是64个字节
- 当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行
- 就会无意中影响彼此的性能,这就是伪共享 -- 对应源码中的:
Sequence
:
Disruptor核心-Sequence
- Sequence可以看成是一个AtomicLong,用于标识进度
- 还有另外一个目的就是防止不同Sequence之间CPU缓存伪共享(False Sharing)的问题-- 对应源码中的:
Sequence
:
高性能之道-算法优化-序号栅栏机制
- 我们在生产者进行投递Event的时候,总会使用:long sequence = ringBuffer.next();
- Disruptor 3.0中,序号栅栏SequenceBarrier和序号Sequence搭配使用,协同和管理消费者与生产者的工作节奏,避免了锁和CAS的使用
- 在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件:-- 参见源码:
SingleProducerSequencer
a. 消费者的序号数值必须小于生产者序号数值;b. 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值; c. 生产者序号数值不能大于消费者最小的序号数值以避免生产者速度过快,将还未来得及消费的消息覆盖
WatiStrategy等待策略
- Disruptor之所以可以说是高性能,其实也有一部分原因取决于它的等待策略的实现:WaitStrategy接口:
-- 查看源码
BlockingWaitStrategy
-- 查看源码YieldingWaitStrategy
Disruptor核心-EventProcessor
- EventProcessor:主要时间循环,处理Disruptor中的Event,拥有消费者的Sequence
- 它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的思想对象 -- 参见
BatchEventProcessor
源码解读
Disruptor:Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用;主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(可以理解为生产者生产数据)的操作; RingBuffer:Disruptor中队列具体的实现,底层封装了Object[]数组;在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值;此外,该对象内部还维护了Sequencer(序列生产器)具体的实现; Sequencer:序列生产器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。上面的例子中,使用的是SingleProducerSequencer;在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy; Sequence:序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标。生产者和消费者各自维护自己的Sequence;但都是指向RingBuffer的Object[]数组; Wait Strategy:等待策略。当没有可消费的事件时,消费者根据特定的策略进行等待;当没有可生产的地方时,生产者根据特定的策略进行等待; Event:事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义; EventProcessor:事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用我们自定义的事件处理器,也就是是否可以进行消费; EventHandler:事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口;
RingBuffer
:
Sequence
:
这个里面缓存行的填充很经典,设计成前7后7 Long类型来填充,保证消除伪共享。 使用空间换时间,避免伪共享。Java8中使用@sun.misc.Contended 来消除伪共享,在运行时需要设置JVM启动参数:-XX:-RestrictContended
这里前7后7加上本身的Value值,总共是有15个Long元素,无论如何拆分,Value和预填充的Long型数据一定会处于单独的一个缓存行。
SingleProducerSequencer
这里就是用简单的if else判断,就避免了加锁,CAS的消耗,这里是使用序号栅栏,通过巧妙的算法 自旋操作来实现等待的操作。 解析如下图:
其中可以自己写代码去debug,创建ringBuffer长度为4,消费者阻塞在第0个元素的消费中。然后生产者再生产第5个元素的时候就会进行自旋等待。
BlockingWaitStrategy
BatchEventProcessor
waitFor 可以参考上面的BlockingWaitStrategy
的waitFor() 方法