高性能线程间消息传递库Disruptor概述

2019-08-29 10:00:51 浏览数 (1)

Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发性 、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

理解Disruptor是什么的最好方法是将它与目前已经的很好理解和非常相似的东西进行比较,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的是在同一进程内的线程之间传递数据(例如消息或事件)。但是Disruptor相比传统JDK中的队列提供了一些关键功能,它们是:

  • Disruptor中的同一个消息会向所有消费者都发送-即多播能力。
  • 为事件(events)预先分配内存,避免频繁垃圾回收与内存分配开销。
  • 可选择无锁(lock-free),基于CAS操作让多个生产者不会竞争同一个元素,实现无锁操作元素。
  • 使用两阶段协议,让多个线程可同时修改不同元素,需要注意的是消费元素时候只能读取到已经提交的元素。
  • 缓存行填充,避免伪共享。

多播能力是Java中队列和Disruptor之间最大的行为差异。当您有多个消费者在同一个Disruptor上监听事件时候,所有事件都会发布给所有消费者,而Java队列中的每个事件只会发送给某一个消费者。 Disruptor的行为旨在用于需要对同一数据进行独立的多个并行操作的情况。

Disruptor的目标之一是在低延迟环境中使用,在低延迟系统中,必须减少或移除内存分配;在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿;为了支持这一点,用户可以预先分配Disruptor中事件所需的存储空间(也就是声明RingBuffer的大小)。在构造RingBuffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时候被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便他们可以调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

低延迟期望推动的另一个关键实现细节是使用无锁算法来实现Disruptor;所有内存可见性和正确性保证都是使用内存屏障(体现为volatile)或CAS操作实现的;在Disruptor的实现中只有一个情况需要实际锁定,这就是当使用BlockingWaitStrategy策略时候,这仅仅是为了使用条件变量,以便在等待新事件到达时前parked消费线程。许多低延迟系统将使用忙等待busy-wait 来避免使用条件可能引起的抖动,但是大量在系统繁忙等待的操作可能导致性能显着下降,尤其是在CPU资源严重受限的情况下。

在JDK的BlockingQueue中当我们需要添加或者取出元素时候是需要加独占锁的,通过锁来保证多线程对底层共享的数据结构进行保护,使用锁导致同时只有一个线程可以向队列添加元素或者删除元素。Disruptor则使用两阶段协议,让多个线程可同时修改不同元素,需要注意的是消费元素时候只能读取到已经提交的元素;在Disruptor中某个线程要访问Ring Buffer中某个序列号下对应的元素时候要先通过CAS操作获取对应元素的所有权(第一阶段),然后通过序列号获取到对应的元素对象并对其中的属性进行修改,最后在发布元素(第二阶段),只有发布后的元素才可以被消费者读取;当多个线程写入元素时候多个线程都会先执行CAS操作获取到Ringbuffer中的某一个元素的所有权,然后可以并发的对自己的元素进行修改,但是需要注意的是只有序列号小的发布后,后面的才可以发布。可知使用CAS相比使用锁大大减少了开销,提高了并发度。

计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或者多级高速缓冲存储器(Cache),这个Cache一般是集成到CPU内部的,所以也叫 CPU Cache,如下图是两级cache结构:

file

Cache内部是按行存储的,其中每一行称为一个Cache行,Cache行是Cache与主内存进行数据交换的单位,Cache行的大小一般为2的幂次数字节。

file

当CPU访问某一个变量时候,首先会去看CPU Cache内是否有该变量,如果有则直接从中获取,否者就去主内存里面获取该变量,然后把该变量所在内存区域的一个Cache行大小的内存拷贝到Cache(Cache行是Cache与主内存进行数据交换的单位)。由于存放到Cache行的的是内存块而不是单个变量,所以可能会把多个变量存放到了一个cache行。当多个线程同时修改一个缓存行里面的多个变量时候,由于同时只能有一个线程操作缓存行,所以相比每个变量放到一个缓存行性能会有所下降,这就是伪共享。

file

如上图变量x,y同时被放到了CPU的一级和二级缓存,当线程1使用CPU1对变量x进行更新时候,首先会修改cpu1的一级缓存变量x所在缓存行,这时候缓存一致性协议会导致cpu2中变量x对应的缓存行失效,那么线程2写入变量x的时候就只能去二级缓存去查找,这就破坏了一级缓存,而一级缓存比二级缓存更快,这里也说明了多个线程不可能同时去修改自己所使用的cpu中缓存行中相同缓存行里面的变量。更坏的情况下如果cpu只有一级缓存,那么会导致频繁的直接访问主内存。

Disruptor中的环形缓存(Ring Buffer)底层是一个地址连续的数组,则数组内相邻的元素很容易会被放入到同一个Cache行里面从而导致伪共享的出现,Disruptor通过缓存行填充,让数组中的每个元素独占一个缓存行从而解决了伪共享问题的出现。另外为了避免环形缓存(Ring Buffer)中序列号(定位元素的游标)与其他元素共享缓存行,对其也就像了缓存行填充,以提高访问序列号时候缓存的命中率。

在我们理解Disruptor如何工作前,我们先看看Disruptor中的核心术语的介绍,或者说是Disruptor中的DDD(Domain-Driven Design)域对象

  • Ring Buffer: 环形缓冲区,通常被认为是Disruptor的核心,但是从3.0版本开始,Ring Buffer仅负责存储和更新Disruptor中的数据(事件)。
  • Sequence: Disruptor使用Sequences作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前功能。事实上,3版本与2之间唯一真正的区别是防止了Sequence和其他变量之间出现伪共享。
  • Sequencer: Sequencer是Disruptor的真正核心。该接口的2个实现(单生产者,多生产者)实现了所有并发算法,用于在生产者和消费者之间快速、正确地传递数据。
  • Sequence Barrier: 序列屏障(Sequence Barrier)由Sequencer产生,并包含对Sequencer中主要发布者的序列Sequence和任何依赖的消费者的序列Sequence的引用。它包含了确定是否有任何可供消费者处理的事件的逻辑。
  • Wait Strategy: 等待策略,确定消费者如何等待生产者将事件放入Disruptor。
  • Event: 从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。
  • EventProcessor: 用于处理来自Disruptor的事件的主事件循环,并拥有消费者序列的所有权。其有一个名为BatchEventProcessor的实现,它包含事件循环的有效实现,并将回调使用者提供的EventHandler接口实现(在线程池内运行BatchEventProcessor的run方法)。
  • EventHandler: 由用户实现并代表Disruptor的消费者的接口。
  • Producer: 调用Disruptor以将事件放入队列的用户代码。这个概念在代码中也没有具体表示。

上面我们介绍了为了Disruptor中的核心概念,下面我们将这些元素组合在一起,如下图是LMAX在其高性能核心服务中使用Disruptor的示例:

file

如上图示例中有三个消费者,即日志记录JournalConsumer(将输入数据写入持久性日志文件),复制ReplicationConsumer(将输入数据发送到另一台机器以确保存在数据的远程副本)和业务逻辑ApplicationConsumer(真正的处理工作),其中JournalConsumer和ReplicationConsumer是可以并行执行的。

Producer向Disruptor的Ring Buffer中写入事件,消费者JournalConsumer和ReplicationConsumer(EventHandler)使用多播方式同时消费Ring Buffer中的每一个元素,两者都有各自的SequenceBarrier用来控制当前可用消费Ring Buffer中的哪一个事件,并且当不存在可用事件时候如何处理。消费者ApplicationConsumer则是等JournalConsumer和ReplicationConsumer对同一个元素处理完毕后,在对该元素进行处理,这个可以使用下面这个简化图来概括:

file

每个消费者持有自己的当前消费序号,由于是环形buffer,所以生产者写入事件时候要看序号最小的消费者序号,以避免覆盖还没有被消费的事件,另外Consumer3消费事件时候只能消费已经被Consumer1,Consumer2都处理过的事件。

每个EventHandler被包裹到对应的BatchEventProcessor中,BatchEventProcessor是一个事件处理循环,类似NIOevenloop,每个BatchEventProcessor被分到线程池里面一个固定线程来执行。BatchEventProcessor发现可用元素后,就调用EventHandler发射出元素。如上图Consumer1,Consumer2,Consumer3共享同一个Ringbuffer。

另外如上图Consumer1,Consumer2(EventHandler)分别被自己的BatchEventProcessor包裹,但是其共享同一个SequenceBarrier,Consumer1,Consumer2读取元素时候要调用SequenceBarrier的waitfor来判断是否有可以读取的元素;Consumer3被自己的BatchEventProcessor包裹,其有自己的SequenceBarrier,并且持有其依赖的前面的所有消费者的引用(Consumer1,Consumer2的引用),Consumer3消费元素时候要看其依赖的所有消费者,看其是否都消费了某一个元素,如果是其才可以消费该元素。

0 人点赞