Netty中的任务队列(添加元素篇)

2022-06-02 14:06:55 浏览数 (1)

此篇文章讲解一下Netty中的任务队列.这里说的任务队列是Netty中的IO线程对应的任务队列.

在Netty中NioEventLoopGroup这个类相当于线程池,而由它创建的每个NioEventLoop相当于池中的线程,因为每个NioEventLoop都是和唯一的一个线程绑定的,而这个线程只负责IO相关的工作,因此称作IO线程.

在创建NioEventLoop的时候会创建一个与之关联的任务队列(Queue<Runnable> taskQueue).这个任务队列用于'装载'其他非IO线程向IO线程提交的任务,比如业务线程(即非IO线程)需要向对端写数据,那么业务线程会把写数据这个操作封装成一个任务'丢到'任务队列中,由IO线程将数据写到网络中.

代码语言:javascript复制
private void write(Object msg, boolean flush, ChannelPromise promise) {
    ...
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 业务线程将写操作封装成Task
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        // 提交任务到IO线程对应的任务队列中
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg, boolean lazy) {
    // 将任务提交到IO线程的任务队列中
    executor.execute(runnable);    
}

首先看下这个taskQueue是由谁实现的

代码语言:javascript复制
// 实例化NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
               
    // newTaskQueue(queueFactory)会实现具体的任务队列
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
代码语言:javascript复制
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory
    if (queueFactory == null) {
        // 流程会走到这里
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
代码语言:javascript复制
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // 流程会继续调用 PlatformDependent.<Runnable>newMpscQueue()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
代码语言:javascript复制
public static <T> Queue<T> newMpscQueue() {
    return Mpsc.newMpscQueue();
}
代码语言:javascript复制
static <T> Queue<T> newMpscQueue() {
    // 流程会执行第一个new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
    // MPSC_CHUNK_SIZE=1024
    return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
        : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}

上面说了这么多,只是想说明taskQueue具体的实现

代码语言:javascript复制
Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024)

接下来具体分析下MpscUnboundedArrayQueue

代码语言:javascript复制
org.jctools.queues.MpscUnboundedArrayQueue

在Netty之前的版本中,taskQueue是Netty自身实现它的.但是后面版本就将这个taskQueue的实现'交由'JCTools下的类来实现了.

代码语言:javascript复制
<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>3.1.0</version>
</dependency>

在Netty中,多个Netty客户端连接Netty服务端的时候,Netty服务端中的一个IO线程会负责处理多个客户端.

如上图所示,IO线程-1负责处理Netty客户端-1和Netty客户端-2的读写请求.当多个业务线程需要向对端写数据的时候,会把写操作封装成任务'丢到'IO线程-1的任务队列中.

Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程'生产'的任务.属于单消费者多生产者模式.通过类的名称MpscUnboundedArrayQueue可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的.

MpscUnboundedArrayQueue的底层使用数组的形式存储元素.

代码语言:javascript复制
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
    // 转成2^n
    int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
    long mask = (p2capacity - 1) << 1;
    // 存储元素的底层数组
    E[] buffer = allocateRefArray(p2capacity   1);
    producerBuffer = buffer;
    producerMask = mask;
    consumerBuffer = buffer;
    consumerMask = mask;
    soProducerLimit(mask);
}

从全局的角度看下MpscUnboundedArrayQueue的底层结构,这里假设

initialCapacity=4

虽然设置的初始容量大小=4,但是当存放的元素大于4的时候,就会新创建一个与之前同等大小的数组,然后'挂接'到之前的数组. 当再次'装载'不了新放入的元素时候,会再次新创建一个数组'挂接'到之前的数组,以此类推.最后形成一个数组 链表的结构.

结合源码分析下.以下假设初始容量大小initialCapacity=4

代码语言:javascript复制
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
    // p2capacity  = 4
    int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
    // 转成二进制mask=110
    long mask = (p2capacity - 1) << 1;
    // 存储元素的底层数组大小=4 1=5
    E[] buffer = allocateRefArray(p2capacity   1);
    // 指向生产者的数组
    producerBuffer = buffer;
    producerMask = mask;
    // 指向消费者的数组
    consumerBuffer = buffer;
    consumerMask = mask;
    // producerLimit=mask=110
    soProducerLimit(mask);
}

构造方法中,虽然设置的初始容量=4,但是在初始化底层数组的时候,分配的大小=5.从上面的图中可以看出,上一个数组为了指向下一个数组,因此数组在设计的时候就必须要多出来一个元素,用于指向下一个数组.

当提交元素的时候,代码如下所示,调用offer方法

代码语言:javascript复制
MpscUnboundedArrayQueue<Integer> queue = new MpscUnboundedArrayQueue<>(4);
queue.offer(1);
代码语言:javascript复制
public boolean offer(final E e) {
    long mask;
    E[] buffer;
    long pIndex;

    while (true) {
        long producerLimit = lvProducerLimit();
        pIndex = lvProducerIndex();
        // 表示正在扩容
        if ((pIndex & 1) == 1) {
            continue;
        }
        mask = this.producerMask;
        buffer = this.producerBuffer;
        // 如果提交的元素即将超过容量
        if (producerLimit <= pIndex) {
            int result = offerSlowPath(mask, pIndex, producerLimit);
            switch (result) {
                case CONTINUE_TO_P_INDEX_CAS:
                    break;
                case RETRY:
                    continue;
                case QUEUE_FULL:
                    return false;
                case QUEUE_RESIZE:
                    // 扩容
                    resize(mask, buffer, pIndex, e, null);
                    return true;
            }
        }

        //  2
        if (casProducerIndex(pIndex, pIndex   2)) {
            break;
        }
    }
    // 计算元素在数组中的偏移地址
    final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
    soRefElement(buffer, offset, e);
    return true;
}

首先要明确一点的是,producerIndex(即代码中的pIndex)记录生产者添加元素指向的位置,而且这个位置并不是在数组中的实际下标.

每添加一个元素,producerIndex就会 2.并不是 1.

通过构造方法初始化时,producerLimit=110.

当添加第一个元素之后,pIndex=010

当添加第二个元素之后,pIndex=100

当添加第三个元素之后,pIndex=110

根据上面第16行代码producerLimit <= pIndex满足条件.进入下面的代码

代码语言:javascript复制
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
    final long cIndex = lvConsumerIndex();
    long bufferCapacity = getCurrentBufferCapacity(mask);

    if (cIndex   bufferCapacity > pIndex) {
        if (!casProducerLimit(producerLimit, cIndex   bufferCapacity)) {
            return RETRY;
        } else {
            return CONTINUE_TO_P_INDEX_CAS;
        }
    } else if (availableInQueue(pIndex, cIndex) <= 0) {
        return QUEUE_FULL;

    }
    // pIndex   1
    else if (casProducerIndex(pIndex, pIndex   1)) {
        // 需要扩容
        return QUEUE_RESIZE;
    } else {
        return RETRY;
    }
}

能走到上面的代码,说明此时容器马上满了,需要扩容了,会将pIndex 1.之后就会进入到扩容逻辑.

代码语言:javascript复制
resize(mask, buffer, pIndex, e, null);

之前的pIndex=110,加1之后,变成pIndex=111.这个时候,其他生产者线程根据(pIndex & 1) == 1判断成立,说明有一个生产者线程正在扩容容器,当前生产者线程需要重试.

也就是说根据最后一个字节,控制是否有生产者线程正在扩容.

代码语言:javascript复制
public boolean offer(final E e) {
    long mask;
    E[] buffer;
    long pIndex;

    while (true) {
        long producerLimit = lvProducerLimit();
        pIndex = lvProducerIndex();
        // 表示有其他线程正在扩容
        if ((pIndex & 1) == 1) {
            // 重试
            continue;
        }
        ...
    }
    ...
}

扩容的线程会重新创建一个新的数组

代码语言:javascript复制
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
    int newBufferLength = getNextBufferSize(oldBuffer);
    final E[] newBuffer;
    try {
        // 创建新数组
        newBuffer = allocateRefArray(newBufferLength);
    } catch (OutOfMemoryError oom) {
        assert lvProducerIndex() == pIndex   1;
        soProducerIndex(pIndex);
        throw oom;
    }

    producerBuffer = newBuffer;
    final int newMask = (newBufferLength - 2) << 1;
    producerMask = newMask;

    final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
    final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

    soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
    // 新老数组进行连接
    soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);

    final long cIndex = lvConsumerIndex();
    final long availableInQueue = availableInQueue(pIndex, cIndex);

    soProducerLimit(pIndex   Math.min(newMask, availableInQueue));

    //  2之后,保证其他生产者线程可以继续增加元素了
    soProducerIndex(pIndex   2);

    // 添加一个JUMP元素
    soRefElement(oldBuffer, offsetInOld, JUMP);
}

在扩容的时候,会添加一个JUMP元素,这个元素是用来告诉消费者,当消费到这类元素的时候,需要跳到下一个数组继续消费.

假设向容器中依次添加1-9这9个元素,它的结构如下

消费者也会按照1-9进行消费.(即添加顺序和消费顺序一致)

在向容器中添加元素的时候,采用如下方式. 根据起始地址 偏移地址,提高添加元素的速度.

代码语言:javascript复制
static long modifiedCalcCircularRefElementOffset(long index, long mask) {
    return REF_ARRAY_BASE   ((index & mask) << (REF_ELEMENT_SHIFT - 1));
}


public static final long REF_ARRAY_BASE;
public static final int REF_ELEMENT_SHIFT;

static {
    // 数组中一个元素占用的大小
    final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
    if (4 == scale) {
        REF_ELEMENT_SHIFT = 2;
    } else if (8 == scale) {
        REF_ELEMENT_SHIFT = 3;
    } else {
        throw new IllegalStateException("Unknown pointer size: "   scale);
    }
    // 数组中第一个元素的偏移地址
    REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class);
}

此篇简单介绍下Netty中如何使用JCTools中的类在并发场景下提交元素,以及它的底层数据结构. 这种是与传统直接创建一个2倍的数组处理方式不同的.

0 人点赞