高并发数据结构Disruptor解析(2)

2021-04-12 16:09:55 浏览数 (3)

Sequence(续)

之前说了Sequence通过给他的核心值value添加前置无用的padding long还有后置无用的padding long来避免对于value操作的false sharing的发生。那么对于这个value的操作是怎么操作的呢? 这里我们需要先了解下Unsafe类这个东西,可以参考我的另一篇文章 - Java Unsafe 类。 Unsafe中有一些底层为C 的方法,对于Sequence,其中做了: 获取Unsafe,通过Unsafe获取Sequence中的value的地址,根据这个地址CAS更新。 com.lmax.disruptor.Sequence.java

代码语言:javascript复制
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    /**
     * 默认初始value为-1
     */
    public Sequence()
    {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    public long get()
    {
        return value;
    }

    /**
     * 利用Unsafe更新value的地址内存上的值从而更新value的值
     */
    public void set(final long value)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    /**
     * 利用Unsafe原子更新value
     */
    public void setVolatile(final long value)
    {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    /**
     * 利用Unsafe CAS
     */
    public boolean compareAndSet(final long expectedValue, final long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }


    public long incrementAndGet()
    {
        return addAndGet(1L);
    }

    public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;

        do
        {
            currentValue = get();
            newValue = currentValue   increment;
        }
        while (!compareAndSet(currentValue, newValue));

        return newValue;
    }

    @Override
    public String toString()
    {
        return Long.toString(get());
    }
}

Producer

SingleProducerSequencer

接下来我们先从Producer看起。Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer,类结构如下:

针对这些接口做一下简单的描述: Cursored接口:实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。 Sequenced接口:实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了

  • getBufferSize获取ringBuffer的大小
  • hasAvailableCapacity判断空间是否足够
  • remainingCapacity获取ringBuffer的剩余空间
  • next申请下一个或者n个sequence(value)作为生产event的位置
  • tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
  • publish发布Event

Sequencer接口:**Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。其中一个比较重要的设计是关于**GatingSequence的设计: 之后我们会提到,RingBuffer的头由一个名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence

  • INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
  • claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
  • isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
  • addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
  • removeGatingSequence:移除某个sequence
  • newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
  • getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
  • getHighestPublishedSequence:获取最高可以读取的Sequence
  • newPoller:目前没用,不讲EventPoller相关的内容(没有用到)

之后,抽象类AbstractSequencer实现Sequencer这个接口:定义了5个域:

代码语言:javascript复制
    private static final AtomicReferenceFieldUpdater SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    protected volatile Sequence[] gatingSequences = new Sequence[0];
  • SEQUENCE_UPDATER 是用来原子更新gatingSequences 的工具类
  • bufferSize记录生产目标RingBuffer的大小
  • waitStrategy表示这个生产者的等待策略(之后会讲)
  • cursor:生产定位,初始为-1
  • gatingSequences :前文已讲

构造方法增加了一些对于这个类的限制:

代码语言:javascript复制
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
}

bufferSize不能小于1并且bufferSize必须是2的n次方。原因我的第一篇文章已经讲述。 对于getCursor和getBufferSize的实现,这里仅仅是简单的getter:

代码语言:javascript复制
@Override
public final long getCursor()
{
    return cursor.get();
}

@Override
public final int getBufferSize()
{
    return bufferSize;
}

对于addGatingSequences和removeGatingSequence,则是原子更新:

代码语言:javascript复制
public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

public boolean removeGatingSequence(Sequence sequence)
{
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}

原子更新工具类静态方法代码:

代码语言:javascript复制
/**
 * 原子添加sequences
 * 
 * @param holder 原子更新的域所属的类对象
 * @param updater 原子更新的域对象
 * @param cursor 定位
 * @param sequencesToAdd 要添加的sequences
 * @param 
 */
static  void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
{
    long cursorSequence;
    Sequence[] updatedSequences;
    Sequence[] currentSequences;
    //在更新成功之前,一直重新读取currentSequences,扩充为添加所有sequence之后的updatedSequences
    do
    {
        currentSequences = updater.get(holder);
        updatedSequences = copyOf(currentSequences, currentSequences.length   sequencesToAdd.length);
        cursorSequence = cursor.getCursor();

        int index = currentSequences.length;
        //将新的sequences的值设置为cursorSequence
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
            updatedSequences[index  ] = sequence;
        }
    }
    while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

    cursorSequence = cursor.getCursor();
    for (Sequence sequence : sequencesToAdd)
    {
        sequence.set(cursorSequence);
    }
}

/**
 * 原子移除某个指定的sequence
 * 
 * @param holder 原子更新的域所属的类对象
 * @param sequenceUpdater 原子更新的域对象
 * @param sequence 要移除的sequence
 * @param 
 * @return
 */
static  boolean removeSequence(
        final T holder,
        final AtomicReferenceFieldUpdater sequenceUpdater,
        final Sequence sequence)
{
    int numToRemove;
    Sequence[] oldSequences;
    Sequence[] newSequences;

    do
    {
        oldSequences = sequenceUpdater.get(holder);

        numToRemove = countMatching(oldSequences, sequence);

        if (0 == numToRemove)
        {
            break;
        }

        final int oldSize = oldSequences.length;
        newSequences = new Sequence[oldSize - numToRemove];

        for (int i = 0, pos = 0; i < oldSize; i  )
        {
            final Sequence testSequence = oldSequences[i];
            if (sequence != testSequence)
            {
                newSequences[pos  ] = testSequence;
            }
        }
    }
    while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));

    return numToRemove != 0;
}

private static  int countMatching(T[] values, final T toMatch)
{
    int numToRemove = 0;
    for (T value : values)
    {
        if (value == toMatch) // Specifically uses identity
        {
            numToRemove  ;
        }
    }
    return numToRemove;
}

对于newBarrier,返回的是一个ProcessingSequenceBarrier: SequenceBarrier我们之后会详讲,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。这里我们先不深入

代码语言:javascript复制
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

之后到了我们这次的核心,SingleProducerSequencer,观察它的结构,他依然利用了long冗余避免CPU的false sharing,这次的field不只有一个,而是有两个,所以,前后放上7个long类型,这样在最坏的情况下也能避免false sharing(参考我的第一篇文章) 这两个field是:

代码语言:javascript复制
protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;

初始值都为-1,这里强调下,由于这个类并没有实现任何的Barrier,所以在Disruptor框架中,这个类并不是线程安全的。不过由于从命名上看,就是单一生产者,所以在使用的时候也不会用多线程去调用里面的方法。 之后就是对AbstractSequencer抽象方法的实现: hasAvailableCapacity判断空间是否足够:

代码语言:javascript复制
@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
    //下一个生产Sequence位置
    long nextValue = this.nextValue;
    //下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
    long wrapPoint = (nextValue   requiredCapacity) - bufferSize;
    //Disruptor经常用缓存,这里缓存之间所有gatingSequences最小的那个,这样不用每次都遍历一遍gatingSequences,影响效率
    long cachedGatingSequence = this.cachedValue;
    //只要wrapPoint大于缓存的所有gatingSequences最小的那个,就重新检查更新缓存
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        this.cachedValue = minSequence;
        //空间不足返回false
        if (wrapPoint > minSequence)
        {
            return false;
        }
    }
    //若wrapPoint小于缓存的所有gatingSequences最小的那个,证明可以放心生产
    return true;
}

对于next方法:申请下一个或者n个sequence(value)作为生产event的位置

代码语言:javascript复制
@Override
public long next() {
    return next(1);
}

@Override
public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;
    //next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的
    long nextSequence = nextValue   n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        //只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        //同理,缓存最小的gatingSequences
        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException。而这里的容量检查,就是通过之前的hasAvailableCapacity方法检查:

代码语言:javascript复制
    @Override
    public long tryNext() throws InsufficientCapacityException {
        return tryNext(1);
    }

    @Override
    public long tryNext(int n) throws InsufficientCapacityException {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }

        if (!hasAvailableCapacity(n)) {
            throw InsufficientCapacityException.INSTANCE;
        }

        long nextSequence = this.nextValue  = n;

        return nextSequence;
    }

publish发布Event:

代码语言:javascript复制
    @Override
    public void publish(long sequence) {
        //cursor代表可以消费的sequence
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    @Override
    public void publish(long lo, long hi) {
        publish(hi);
    }

其他:

代码语言:javascript复制
    @Override
    public void claim(long sequence) {
        nextValue = sequence;
    }

    @Override
    public boolean isAvailable(long sequence) {
        return sequence <= cursor.get();
    }

    @Override
    public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
        return availableSequence;
    }

下面,我们针对SingleProducerSequencer画一个简单的工作流程: 假设有如下RingBuffer和SingleProducerSequencer,以及对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。这里生产的Sequence还有消费的Sequence都是从零开始不断增长的,即使大于BufferSize,也可以通过sequence的值对BufferSize取模定位到RingBuffer上。

假设SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是: wrapPoint = (nextValue requiredCapacity) - bufferSize = (-1 2) - 4 = -3 -3 < cachedValue所以不用检查gateSequences直接返回true。假设返回true,就开始填充,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。

假设这之后,消费者消费了一个Event,更新Sequence为0.

之后,生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是: wrapPoint = (nextValue requiredCapacity) - bufferSize = (1 4) - 4 = 1 1 > cachedValue所以要重新检查,这是最小的Sequence是0,但是1 > 仍然大于最小的Sequence,所以更新cachedValue,返回false。

至此,展示了一个简单的生产过程,SingleProducerSequencer也就讲完啦。

0 人点赞