Sequence(续)
之前说了Sequence通过给他的核心值value添加前置无用的padding long还有后置无用的padding long来避免对于value操作的false sharing的发生。那么对于这个value的操作是怎么操作的呢? 这里我们需要先了解下Unsafe类这个东西,可以参考我的另一篇文章 - Java Unsafe 类。 Unsafe中有一些底层为C 的方法,对于Sequence,其中做了: 获取Unsafe,通过Unsafe获取Sequence中的value的地址,根据这个地址CAS更新。 com.lmax.disruptor.Sequence.java
代码语言:javascript复制public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
/**
* 默认初始value为-1
*/
public Sequence()
{
this(INITIAL_VALUE);
}
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
public long get()
{
return value;
}
/**
* 利用Unsafe更新value的地址内存上的值从而更新value的值
*/
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* 利用Unsafe原子更新value
*/
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
* 利用Unsafe CAS
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
public long incrementAndGet()
{
return addAndGet(1L);
}
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}
Producer
SingleProducerSequencer
接下来我们先从Producer看起。Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer,类结构如下:
针对这些接口做一下简单的描述: Cursored接口:实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。 Sequenced接口:实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了
- getBufferSize获取ringBuffer的大小
- hasAvailableCapacity判断空间是否足够
- remainingCapacity获取ringBuffer的剩余空间
- next申请下一个或者n个sequence(value)作为生产event的位置
- tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
- publish发布Event
Sequencer接口:**Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。其中一个比较重要的设计是关于**GatingSequence的设计: 之后我们会提到,RingBuffer的头由一个名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence
- INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
- claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
- isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
- addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
- removeGatingSequence:移除某个sequence
- newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
- getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
- getHighestPublishedSequence:获取最高可以读取的Sequence
- newPoller:目前没用,不讲EventPoller相关的内容(没有用到)
之后,抽象类AbstractSequencer实现Sequencer这个接口:定义了5个域:
代码语言:javascript复制 private static final AtomicReferenceFieldUpdater SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
protected final int bufferSize;
protected final WaitStrategy waitStrategy;
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
protected volatile Sequence[] gatingSequences = new Sequence[0];
- SEQUENCE_UPDATER 是用来原子更新gatingSequences 的工具类
- bufferSize记录生产目标RingBuffer的大小
- waitStrategy表示这个生产者的等待策略(之后会讲)
- cursor:生产定位,初始为-1
- gatingSequences :前文已讲
构造方法增加了一些对于这个类的限制:
代码语言:javascript复制public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
bufferSize不能小于1并且bufferSize必须是2的n次方。原因我的第一篇文章已经讲述。 对于getCursor和getBufferSize的实现,这里仅仅是简单的getter:
代码语言:javascript复制@Override
public final long getCursor()
{
return cursor.get();
}
@Override
public final int getBufferSize()
{
return bufferSize;
}
对于addGatingSequences和removeGatingSequence,则是原子更新:
代码语言:javascript复制public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
public boolean removeGatingSequence(Sequence sequence)
{
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
原子更新工具类静态方法代码:
代码语言:javascript复制/**
* 原子添加sequences
*
* @param holder 原子更新的域所属的类对象
* @param updater 原子更新的域对象
* @param cursor 定位
* @param sequencesToAdd 要添加的sequences
* @param
*/
static void addSequences(
final T holder,
final AtomicReferenceFieldUpdater updater,
final Cursored cursor,
final Sequence... sequencesToAdd)
{
long cursorSequence;
Sequence[] updatedSequences;
Sequence[] currentSequences;
//在更新成功之前,一直重新读取currentSequences,扩充为添加所有sequence之后的updatedSequences
do
{
currentSequences = updater.get(holder);
updatedSequences = copyOf(currentSequences, currentSequences.length sequencesToAdd.length);
cursorSequence = cursor.getCursor();
int index = currentSequences.length;
//将新的sequences的值设置为cursorSequence
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
updatedSequences[index ] = sequence;
}
}
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
cursorSequence = cursor.getCursor();
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
}
}
/**
* 原子移除某个指定的sequence
*
* @param holder 原子更新的域所属的类对象
* @param sequenceUpdater 原子更新的域对象
* @param sequence 要移除的sequence
* @param
* @return
*/
static boolean removeSequence(
final T holder,
final AtomicReferenceFieldUpdater sequenceUpdater,
final Sequence sequence)
{
int numToRemove;
Sequence[] oldSequences;
Sequence[] newSequences;
do
{
oldSequences = sequenceUpdater.get(holder);
numToRemove = countMatching(oldSequences, sequence);
if (0 == numToRemove)
{
break;
}
final int oldSize = oldSequences.length;
newSequences = new Sequence[oldSize - numToRemove];
for (int i = 0, pos = 0; i < oldSize; i )
{
final Sequence testSequence = oldSequences[i];
if (sequence != testSequence)
{
newSequences[pos ] = testSequence;
}
}
}
while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
return numToRemove != 0;
}
private static int countMatching(T[] values, final T toMatch)
{
int numToRemove = 0;
for (T value : values)
{
if (value == toMatch) // Specifically uses identity
{
numToRemove ;
}
}
return numToRemove;
}
对于newBarrier,返回的是一个ProcessingSequenceBarrier: SequenceBarrier我们之后会详讲,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。这里我们先不深入
代码语言:javascript复制public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
之后到了我们这次的核心,SingleProducerSequencer,观察它的结构,他依然利用了long冗余避免CPU的false sharing,这次的field不只有一个,而是有两个,所以,前后放上7个long类型,这样在最坏的情况下也能避免false sharing(参考我的第一篇文章) 这两个field是:
代码语言:javascript复制protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;
初始值都为-1,这里强调下,由于这个类并没有实现任何的Barrier,所以在Disruptor框架中,这个类并不是线程安全的。不过由于从命名上看,就是单一生产者,所以在使用的时候也不会用多线程去调用里面的方法。 之后就是对AbstractSequencer抽象方法的实现: hasAvailableCapacity判断空间是否足够:
代码语言:javascript复制@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
//下一个生产Sequence位置
long nextValue = this.nextValue;
//下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
long wrapPoint = (nextValue requiredCapacity) - bufferSize;
//Disruptor经常用缓存,这里缓存之间所有gatingSequences最小的那个,这样不用每次都遍历一遍gatingSequences,影响效率
long cachedGatingSequence = this.cachedValue;
//只要wrapPoint大于缓存的所有gatingSequences最小的那个,就重新检查更新缓存
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
this.cachedValue = minSequence;
//空间不足返回false
if (wrapPoint > minSequence)
{
return false;
}
}
//若wrapPoint小于缓存的所有gatingSequences最小的那个,证明可以放心生产
return true;
}
对于next方法:申请下一个或者n个sequence(value)作为生产event的位置
代码语言:javascript复制@Override
public long next() {
return next(1);
}
@Override
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
//next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的
long nextSequence = nextValue n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
long minSequence;
//只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
//同理,缓存最小的gatingSequences
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException。而这里的容量检查,就是通过之前的hasAvailableCapacity方法检查:
代码语言:javascript复制 @Override
public long tryNext() throws InsufficientCapacityException {
return tryNext(1);
}
@Override
public long tryNext(int n) throws InsufficientCapacityException {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
if (!hasAvailableCapacity(n)) {
throw InsufficientCapacityException.INSTANCE;
}
long nextSequence = this.nextValue = n;
return nextSequence;
}
publish发布Event:
代码语言:javascript复制 @Override
public void publish(long sequence) {
//cursor代表可以消费的sequence
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
@Override
public void publish(long lo, long hi) {
publish(hi);
}
其他:
代码语言:javascript复制 @Override
public void claim(long sequence) {
nextValue = sequence;
}
@Override
public boolean isAvailable(long sequence) {
return sequence <= cursor.get();
}
@Override
public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
return availableSequence;
}
下面,我们针对SingleProducerSequencer画一个简单的工作流程: 假设有如下RingBuffer和SingleProducerSequencer,以及对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。这里生产的Sequence还有消费的Sequence都是从零开始不断增长的,即使大于BufferSize,也可以通过sequence的值对BufferSize取模定位到RingBuffer上。
假设SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是: wrapPoint = (nextValue requiredCapacity) - bufferSize = (-1 2) - 4 = -3 -3 < cachedValue所以不用检查gateSequences直接返回true。假设返回true,就开始填充,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。
假设这之后,消费者消费了一个Event,更新Sequence为0.
之后,生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是: wrapPoint = (nextValue requiredCapacity) - bufferSize = (1 4) - 4 = 1 1 > cachedValue所以要重新检查,这是最小的Sequence是0,但是1 > 仍然大于最小的Sequence,所以更新cachedValue,返回false。
至此,展示了一个简单的生产过程,SingleProducerSequencer也就讲完啦。