Netty的异步任务处理与Socket事件处理

2021-08-06 14:28:39 浏览数 (1)

经过前面几章的学习,我们基本是明白了Netty通道的创建、注册、与绑定与JDK NIO的对应关系,如果我们使用的是JDK NIO的方式去开发一个Socket服务端的时候,此时还缺少了一个重要的环节,就是循环处理IO事件!

我们前面不只一次的见到Netty的异步事件,因为我们某些知识还没有学习到,所以我们都按照同步的方式去获取的,所以我们本章节将带你学习,Netty对于IO事件的处理与异步事件的处理!

我们以绑定为出发点,由点到面进行分析!

一、源码入口

我们直接进入到绑定的源码分析:

代码语言:javascript复制
private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
    // 其channelRegistered()实现中的管道。
    channel.eventLoop().execute(() -> {
        if (regFuture.isSuccess()) {
            channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            promise.setFailure(regFuture.cause());
        }
    });
}

我们上节课直接分析的channel.bind方法,而忽略上上面的异步方法,这里我们开始分析异步方法,我们进入到channel.eventLoop().execute()方法:

image-20210430145227945

二、源码分析

我们前面分析过,每个Channel绑定一个NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子类,所以我们进入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):

代码语言:javascript复制
@Override
public void execute(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

---------------------------分界线------------------------------------

//继续往下追  execute
private void execute(Runnable task, boolean immediate) {
    //判断当前执行的线程是不是 NIoEventLoopGroup的线程  这里是false
    boolean inEventLoop = inEventLoop();
    //将任务加入到队列
    addTask(task);
    //这里永远只能启动一次  一个eventLoop
    if (!inEventLoop) {
        //启动线程
        startThread();
        .....................................
    }
    //io.netty.channel.nio.NioEventLoop.selector
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

我们这里可以分为两部分:

1. 添加任务

代码语言:javascript复制
addTask(task);

----------------------------------分界线---------------------------
    
protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (!offerTask(task)) {
        reject(task);
    }
}

基础好一点的同学我估计已经有点猜到了,单看这个 offerTask有没有像和队列相关的操作,我们进入到offerTask方法:

代码语言:javascript复制
final boolean offerTask(Runnable task) {
    ...............忽略.................
    return taskQueue.offer(task);
}

果不其然,果然是入队操作,taskQueue是什么呢?

image-20210430152558414

我们再初始化NioEventLoop的源码分析学习的时候,学习到,我们会创建两个MpscQ队列(多生产者,单消费者),这个taskQueue就是当时我们创建的一个任务队列,这里面将我们提交的异步任务追加到队列里面!

返回异步任务是不是被追加到队列里面了,如果队列满了,或者其他原因追加失败的话,会返回false,就会执行reject方法:

代码语言:javascript复制
protected final void reject(Runnable task) {
    rejectedExecutionHandler.rejected(task, this);
}

这个拒绝策略同样是我们再创建NioEventLoop的时候创建保存的,给大家留一个作业,去追一下这个拒绝策略,判断一下当发生了添加异步任务失败之后,会发生什么呢?

2. 启动消费线程

代码语言:javascript复制
startThread();

-----------------------------分割线-------------------------
    /**
     * 启动线程
     */
private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                //启动线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

注意,这里有个CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判断消费线程是不是已经启动,如果已经启动就不进入这个逻辑,如果没启动就进入这个逻辑!我们第一次调用,肯定没启动,进入这个逻辑:

代码语言:javascript复制
doStartThread();
----------------------------分割线---------------------------

private void doStartThread() {
    assert thread == null;
    //创建一条线程并启动
    //这个线程又EventLoop
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //保存当前线程  给线程赋值的就是这里
            thread = Thread.currentThread();
            ...........................忽略........................
            try {
                //进行实际的启动
                //io.netty.channel.nio.NioEventLoop.run
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ...........................忽略........................
            }
        }
        ...........................忽略........................
    }
      ...........................忽略........................
}

代码比较长,我们只分析主线逻辑:

代码语言:javascript复制
thread = Thread.currentThread();

首先保存了一下当前线程到成员变量,这个分支不是很重要,后面有时间进行分析!

代码语言:javascript复制
SingleThreadEventExecutor.this.run();

这个就是处理异步任务的代码,我们进入到run方法查看:

image-20210501112253211

代码语言:javascript复制
@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                //存在任务就返回IO时间的数量,不存在任务就返回select阻塞等待事件发生
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    //如果不存在异步任务  就进行事件选择
                    case SelectStrategy.SELECT:
                        //下一个定时任务的截至时间  当不存在任务的时候就返回-1
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; 
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            //不存在任务就去阻塞获取IO事件
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                    default:
                }
            } catch (IOException e) {
                //替换一个选择器
                rebuildSelector0();
                //选择次数重置为0
                selectCnt = 0;
                //处理循环异常  主要处理方式就是睡眠一会让程序主动释放CPU
                handleLoopException(e);
                continue;
            }
			//本次循环次数 1
            selectCnt  ;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //这里是默认值  50
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            //不会进这个分支
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
                //当存在I/O事件的时候
            } else if (strategy > 0) {
                //记录一下当前的时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理IO事件
                    processSelectedKeys();
                } finally {
                    //计算处理IO事件耗费的事件
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //里面的时间是计算处理异步任务的时间尽量保持为1:1
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                //没有IO事件的话就处理异步任务
                ranTasks = runAllTasks(0); 
            }
			
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                //没有空轮询的话三次一清空
                selectCnt = 0;
                //如果空轮询的次数超过默认的512次  就处理空轮询BUG的选择器
            } else if (unexpectedSelectorWakeup(selectCnt)) { 
                //空轮询被处理后清空 轮询次数
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            ...................忽略........................
        } finally {
        	...................忽略........................    
        }
    }
}

这主线逻辑分为三个:如何解决IO事件、如何处理异步任务、如何解决空轮询BUG!!分支代码关注一下注释,这里分析下主线代码:

I. I/O事件的处理

processSelectedKeys();

代码语言:javascript复制
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

selectedKeys是我们在创建NIOEventLoop的时候,会创建一个优化后的的SelectorKeySet集合,使用数组来实现的,大家忘记的话,可以会看一下NioEventLoop的初始化源码篇!

当你没有禁用优化的时候,就会进入到if分支,我们查看if内部代码的源码:

代码语言:javascript复制
private void processSelectedKeysOptimized() {
    //开始遍历所有的主键
    for (int i = 0; i < selectedKeys.size;   i) {
        //获取事件
        final SelectionKey k = selectedKeys.keys[i];
        //将该位置的数据制空
        selectedKeys.keys[i] = null;
		//获取之间注册NioServerSocketChannel的时候,绑定的Channel对象
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            //开始进行IO事件处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            .........................忽略............................
        }
        .........................忽略............................
    }
}

获取事件集合中的每一个key,同时获取之前绑定的NioServerSocketChannel,然后调用processSelectedKey处理这个事件:

代码语言:javascript复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        //当key失效之后,就关闭通道
        ....................忽略....................
    }

    try {
        //获取当前事件的key 掩码
        int readyOps = k.readyOps();
        //是否包含连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            //获取包含的事件
            int ops = k.interestOps();
            //剔除OP_CONNECT事件
            ops &= ~SelectionKey.OP_CONNECT;
            //重新更新关注的事件
            k.interestOps(ops);
			//传播 connect事件
            unsafe.finishConnect();
        }
		//如果当前返回的关注事件的掩码包含 OP_WRITE的话
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            //开始向通道内刷新数据
            ch.unsafe().forceFlush();
        }
//如果当前的事件掩码包含读、新连接接入事件  或者 不关注任何事件的时候  传播read事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {	  //传播read事件 可能是新连接接入也可能有数据可读
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        //发生异常关闭通道
        unsafe.close(unsafe.voidPromise());
    }
}

大家可以看到,里面的处理基本和我们对于JDK NIO的处理一致,就是判断各种事件然后进行对应的处理!

II、异步任务的处理
代码语言:javascript复制
runAllTasks();
代码语言:javascript复制
protected boolean runAllTasks() {
   assert inEventLoop();
   boolean fetchedAll;
   boolean ranAtLeastOne = false;

   do {
       //合并任务  将定时任务的队列里面的任务拉去出来,和异步任务的队列进行合并
       fetchedAll = fetchFromScheduledTaskQueue();
       //开始执行全部的任务
       if (runAllTasksFrom(taskQueue)) {
           ranAtLeastOne = true;
       }
   } while (!fetchedAll); 

   if (ranAtLeastOne) {
       lastExecutionTime = ScheduledFutureTask.nanoTime();
   }
   afterRunningAllTasks();
   return ranAtLeastOne;
}

这里就是异步任务的被执行的地方,这里分为两个步骤:1. 合并任务 2.执行taskQueue异步任务 3.执行tailQueue异步任务!

合并任务

代码语言:javascript复制
fetchedAll = fetchFromScheduledTaskQueue();

Netty在我们学习中已经知道了两种队列,一种是taskQueue队列,一种是tailQueue队列,现在又出现了第三种队列:scheduledTaskQueue,他是一个专门存放定时任务的对队列,这里的合并任务就是将即将要执行的任务合并到taskQueue中等待执行!

这行代码执行完毕后,所有即将要执行的任务都被添加在了taskQueue队列中,等待后续的执行!

执行taskQueue异步任务

代码语言:javascript复制
//注意这里传入的是合并完成后额taskQueue
runAllTasksFrom(taskQueue)

上述代码将对应的任务全部集中到了taskQueue队列中后们这里开始消费taskQueue队列进行执行!我们可以适当的看一下源码:

代码语言:javascript复制
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //从taskQueue队列中弹出一个任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        //执行任务  调用run方法
        safeExecute(task);
        //继续弹出任务
        task = pollTaskFrom(taskQueue);
        //如果弹出的任务为空
        if (task == null) {
            //直接返回
            return true;
        }
    }
}

执行tailQueue异步任务

代码语言:javascript复制
afterRunningAllTasks();

这里开始执行tailQueue节点的任务,可以看到,tailQueue节点的任务执行优先级低于上述两种队列!

image-20210503101059511

代码语言:javascript复制
@Override
protected void afterRunningAllTasks() {
    //注意这里传入的是 tailQueue
    runAllTasksFrom(tailTasks);
}

//继续往下看源码
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //弹出任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        //执行任务
        safeExecute(task);
        //再次弹出任务
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            //任务执行完毕  返回true
            return true;
        }
    }
}

这里就不作过多讲解了,这里和上面的逻辑基本一致,只是执行的qeueb不是一个!

III、解决臭名昭著的JDK空轮询BUG

可能大家大家都知道,JDK NIO在事件循环判断的时候可能会出现空轮询的BUG,导致CPU100%,虽然Oracle官方宣称空轮询的BUG已经解决了,但是后续经过一些公司实际的业务上证明并没有解决,只是出现几率小了点,Netty事实上并没有解决这个空轮询BUG只是用另外一种比较巧妙的方法规避开了,我们一起学习下:

首先,我们先想一下,我们如何断定我们的程序可能发生了空轮询的BUG,学习过NIO的都知道,我们会调用一个selector.select()进行阻塞等待有完成的事件发生,当selet方法阻塞解除的时候,就证明一定有我么感兴趣的事件发生,但是当我们发现select方法解除了阻塞,但是事件数量却为0的时候,我们就认为可能出现了空轮询的BUG!

但是IO数量为0并不是一定出现了空轮询的BUG,也可能外部调用了markUp方法,所以我们不能每一次出现事件数量为0的时候都认为程序出现了空轮询BUG,所以我们就需要有一个记录它出现该类异常情况发生的次数,当发生的次数达到了我们设置的阈值,就证明它可能发生了空轮询的BUG,这个时候需要处理这个空轮询的BUG!

那么如何处理呢? 我们任务发生空轮询问题是因为(JDK官方认为,这个Linux Epoll告诉JDK有事件了,但是JDK获取事件的时候获取了一个空,所以JDK只能返回一个0)所以就发生了空轮询:

JDK官方给出的解决方案

Netty是使用的第三种,抛弃旧的选择器,重建一个新的选择器,然后替换旧的选择器,我们一起看下源码!

我们看看Netty是如何做的,我们回到io.netty.channel.nio.NioEventLoop#run源码:

我还是,为了方便讲解,把这段代码贴出来省略和空轮询无关的代码(完整代码见上):

代码语言:javascript复制
@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        ........................忽略进行事件选择的代码...................
			//本次循环次数 1
            selectCnt  ;
        ....................忽略事件处理和异步任务执行的代码................
			//当处理的异步任务或者IO事件的数量大于0,证明没有发生空轮询
            if (ranTasks || strategy > 0) {
                //每隔三次打印一次日志
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                //没有空轮询的话清空 
                selectCnt = 0;
                //如果出现异步任务为空  IO事件为空的话就会进入到这个逻辑
            } else if (unexpectedSelectorWakeup(selectCnt)) { 
                //空轮询被处理后清空 轮询次数
                selectCnt = 0;
            }
    } catch (CancelledKeyException e) {
        ...................忽略........................
    } finally {
        ...................忽略........................    
    }
}

可以仔细的看一下 上述代码的注释,我们进入到 unexpectedSelectorWakeup(selectCnt) 方法:

代码语言:javascript复制
private boolean unexpectedSelectorWakeup(int selectCnt) {
    ..............忽略日志打印................
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        //判断异常情况的次数是不是超过了预设的512次
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        //开始重新构建一个selector
        rebuildSelector();
        return true;
    }
    return false;
}

我们读源码到这里,可以知道,当异常执行的次数超过了阈值 512次,就会调用一个 rebuildSelector方法,我们点进去看一下:

代码语言:javascript复制
public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}

我们按照惯例,按照同步方法调用 rebuildSelector0();

代码语言:javascript复制
private void rebuildSelector0() {
    //获取原始的选择器
    final Selector oldSelector = selector;
    //声明一个新的选择器
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        //创建一个新的选择器,赋值给新的选择器变量
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    int nChannels = 0;
    //开始遍历旧的选择器,将旧选择器的IO事件的key,绑定到新创建的选择器上
    for (SelectionKey key: oldSelector.keys()) {
        //获取旧选择器的管道
        Object a = key.attachment();
        try {
            //如果key失效了,就跳过!
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
			//获取对应关注的事件掩码
            int interestOps = key.interestOps();
            //将旧key置为失效
            key.cancel();
            //重新将管道绑定到新的选择器上
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            //替换管道里面保存的选择器事件主键
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels   ;
        } catch (Exception e) {
            ...............省略...............
        }
    }
	//重新保存新的优化后的选择器和原始选择器  
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        //关闭旧的选择器
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            ...............省略..................
        }
    }
	...............省略..................
}

我们从上述代码可以看到,Netty处理空轮询的问题的策略是,当发现你可能发生空轮询的次数超过了512次的时候,就直接重新获取一个新的选择器,然后将旧的选择器直接替换掉,这样空轮询的BUG也就很轻易的解决了!

三、总结

  1. 每一个EventLoop都会启动一条永久运行的线程,用于处理异步任务和IO事件,我们称之为Reactor线程。
  2. 如果存在IO事件的话,会先处理IO事件!
  3. Reactor线程会先将定时任务里面的任务合并到taskqueue里面,然后执行!taskQueue执行完毕后执行tailQueue队列的任务!
  4. 如果空轮询的次数发生了512次,就认为发生了空轮询的BUG,就会抛弃原来的选择器,重建一个新的选择器,将旧选择器上的事件全部绑定到新的选择器上,然后将旧选择器删除!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!

0 人点赞