本文基于最新的3.4.2的版本文档进行翻译,翻译自: https://github.com/LMAX-Exchange/disruptor/wiki/Introduction https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
Disruptor简介
最好的方法去理解Disruptor就是将它和容易理解并且相似的队列,例如BlockingQueue。Disruptor其实就像一个队列一样,用于在不同的线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,如:
- 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
- 预分配用于存储事件内容的内存空间;
- 针对极高的性能目标而实现的极度优化和无锁的设计;
Disruptor核心架构组件
- Ring Buffer:Ring Buffer在3.0版本以前被认为是Disruptor的核心组件,但是在之后的版本中只是负责存储和更新数据。在一些高级使用案例中用户也能进行自定义
- Sequence:Disruptor使用一组Sequence来作为一个手段来标识特定的组件的处理进度( RingBuffer/Consumer )。每个消费者和Disruptor本身都会维护一个Sequence。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
- Sequencer:Sequencer是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
- Sequence Barrier:保持Sequencer和Consumer依赖的其它Consumer的 Sequence 的引用。除此之外还定义了决定 Consumer 是否还有可处理的事件的逻辑。
- Wait Strategy:Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。
- Event:从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
- EventProcessor:持有特定的消费者的Sequence,并且拥有一个主事件循环(main event loop)用于处理Disruptor的事件。其中BatchEventProcessor是其具体实现,实现了事件循环(event loop),并且会回调到实现了EventHandler的已使用过的实例中。
- EventHandler:由用户实现的接口,用于处理事件,是 Consumer 的真正实现
- Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
事件广播(Multicast Events)
这是Disruptor和队列最大的区别。当你有多个消费者监听了一个Disruptor,所有的事件将会被发布到所有的消费者中,相比之下队列的一个事件只能被发到一个消费者中。Disruptor这一特性被用来需要对同一数据进行多个并行操作的情况。如在LMAX系统中有三个操作可以同时进行:日志(将数据持久到日志文件中),复制(将数据发送到其他的机器上,以确保存在数据远程副本),业务逻辑处理。也可以使用WokrerPool来并行处理不同的事件。
消费者依赖关系图(Consumer Dependency Graph)
为了支持真实世界中的业务并行处理流程,Disruptor提供了多个消费者之间的协助功能。回到上面的LMAX的例子,我们可以让日志处理和远程副本赋值先执行完之后再执行业务处理流程,这个功能被称之为gating。gating发生在两种场景中。第一,我们需要确保生产者不要超过消费者。通过调用RingBuffer.addGatingConsumers()增加相关的消费者至Disruptor来完成。第二,就是之前所说的场景,通过构造包含需要必须先完成的消费者的Sequence的SequenceBarrier来实现。
引用上面的例子来说,有三个消费者监听来自RingBuffer的事件。这里有一个依赖关系图。ApplicationConsumer依赖JournalConsumer和ReplicationConsumer。这个意味着JournalConsumer和ReplicationConsumer可以自由的并发运行。依赖关系可以看成是从ApplicationConsumer的SequenceBarrier到JournalConsumer和ReplicationConsumer的Sequence的连接。还有一点值得关注,Sequencer与下游的消费者之间的关系。它的角色是确保发布不会包裹RingBuffer。为此,所有下游消费者的Sequence不能比ring buffer的Sequence小且不能比ring buffer 的大小小。因为ApplicationConsumers的Sequence是确保比JournalConsumer和ReplicationConsumer的Sequence小或等于,所以Sequencer只需要检查ApplicationConsumers的Sequence。在更为普遍的应用场景中,Sequencer只需要意识到消费者树中的叶子节点的的Sequence即可。
事件预分配(Event Preallocation)
Disruptor的一个目标之一是被用在低延迟的环境中。在一个低延迟系统中有必要去减少和降低内存的占用。在基于Java的系统中,需要减少由于GC导致的停顿次数(在低延迟的C/C 系统中,由于内存分配器的争用,大量的内存分配也会导致问题)。
为了满足这点,用户可以在Disruptor中为事件预分配内存。所以EventFactory是用户来提供,并且Disruptor的Ring Buffer每个entry中都会被调用。当将新的数据发布到Disruptor中时,Disruptor的API将会允许用户持有所构造的对象,以便用户可以调用这些对象的方法和更新字段到这些对象中。Disruptor将确保这些操作是线程安全。
可选择的无锁
无锁算法实现的Disruptor的所有内存可见性和正确性都使用内存屏障和CAS操作实现。只仅仅一个场景BlockingWaitStrategy中使用到了lock。而这仅仅是为了使用Condition,以便消费者线程能被park住当在等待一个新的事件到来的时候。许多低延迟系统都使用自旋(busy-wait)来避免使用Condition造成的抖动。但是自旋(busy-wait)的数量变多时将会导致性能的下降,特别是CPU资源严重受限的情况下。例如,在虚拟环境中的Web服务器。
Disruptor使用
我们使用一个简单的例子来体验一下Disruptor。生产者会传递一个long类型的值到消费者,消费者接受到这个值后会打印出这个值。
定义Event
代码语言:javascript复制public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
为了使用Disruptor的内存预分配event,我们需要定义一个EventFactory:
代码语言:javascript复制import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
为了让消费者处理这些事件,所以我们这里定义一个事件处理器,负责打印event:
代码语言:javascript复制import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " event);
}
}
使用Translators发布事件
在Disruptor的3.0版本中,由于加入了丰富的Lambda风格的API,可以用来帮组开发人员简化流程。所以在3.0版本后首选使用Event Publisher/Event Translator来发布事件。
代码语言:javascript复制import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
这种方法的另一个优点是可以将translator代码放入一个单独的类中,并且可以轻松地对它们进行独立的单元测试。
使用过时的API发布事件
代码语言:javascript复制import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
这里我们需要把发布包裹在try/finally代码块中。如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。特别地在多生产中如果没有提交Sequence,那么会造成消费者停滞,导致只能重启消费者才能恢复。
整合
代码语言:javascript复制import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l )
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
我们也可以使用Java 8的函数式编程来写这个例子:
代码语言:javascript复制import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l )
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
使用函数式编程我们可以发现很多的类都不需要了,如:handler,translator等。 上面的代码还可以再简化一下:
代码语言:javascript复制ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l )
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
不过这样将实例化一个对象去持有ByteBuffer bb
变量传入lambda的值。这会产生不必要的垃圾。因此,如果要求低GC压力,则应首选将参数传递给lambda的调用。
提升性能的两个参数
如果想要让Disruptor拥有更好的性能这里有两个选项可以调整,wait strategy 和 producer的类型。
单生产者 vs 多生产者
最好的方法在并发环境下提高性能是坚持单独写原则( Single Writer Principle)。如果你的业务场景中只有一个线程写入数据到Disruptor,那么你可以设置成单生产者来提升性能:
代码语言:javascript复制public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
//.....
}
}
性能测试: Multiple Producer
代码语言:javascript复制Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
Single Producer
代码语言:javascript复制Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
等待策略
BlockingWaitStrategy Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
SleepingWaitStrategy SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。
YieldingWaitStrategy YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
BusySpinWaitStrategy 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
清除Ring Buffer中的对象
通过Disruptor传递数据时,对象的生存期可能比预期的更长。为避免发生这种情况,可能需要在处理事件后清除事件。如果只有一个事件处理程序,则需要在处理器中清除对应的对象。如果您有一连串的事件处理程序,则可能需要在该链的末尾放置一个特定的处理程序来处理清除对象。
代码语言:javascript复制class ObjectEvent<T>
{
T val;
void clear()
{
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}