前言
在前面的文章第三篇《Netty组件之Channel注册》分析了channel是如何注册到Selector上的。第五篇《Netty之客户端连接调用》,分析了建立连接的过程。本文将梳理如下内容:
1.就绪事件如何轮询的?bossGroup和workGroup都轮询什么感兴趣的事件?
2.bossGroup的职责是什么?又是如何将客户端新建连接Channel传递到workGroup的?
3.workGroup的职责是什么?如何回调到我们自己加入的childHandler中的?
一、准备工作
1.示例代码
本文将以这段示例为入口进行分析,示例中设置了bossGroup、workerGroup以及childHandler为HttpUploadServerInitializer。
代码语言:javascript复制EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(new HttpUploadServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
2.原理文章
在阅读本文前,可以先阅读下之前的两篇文章。系统五种I/O模型 和 Reactor线程模型。
二、就绪事件轮询
接着第三篇《Netty组件之Channel注册》channel注册到Selector,返回selectionKey。其中包含isReadable、isWritable、isConnectable、isAcceptable等通道的就绪状态。一起看下Netty是如何轮询这些就绪事件的。
1.注册selectionKey集合
将Channel注册到Selector时再往后看下implRegister()方法以及实现AbstractPollSelectorImpl。
代码语言:javascript复制protected abstract void implRegister(SelectionKeyImpl ski)
代码语言:javascript复制protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
// Check to see if the array is large enough
if (channelArray.length == totalChannels) {
// Make a larger array
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
// Copy over
for (int i=channelOffset; i<totalChannels; i )
temp[i] = channelArray[i];
channelArray = temp;
// Grow the NativeObject poll array
pollWrapper.grow(newSize);
}
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.addEntry(ski.channel);
totalChannels ;
keys.add(ski); // 注解@1
}
}
代码语言:javascript复制protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
publicKeys = Collections.unmodifiableSet(keys); // 注解@2
publicSelectedKeys = Util.ungrowableSet(selectedKeys); // 注解@3
}
}
注解@1:将注册channel返回的selectionKey放入到了HashSetkeys中。
注解@2:SetpublicKeys 也就是selectionKey的集合。
小结:SelectorImpl类中有这么一个结合publicKeys存储了selectionKey,而就绪事件的轮询需要依靠轮询selectionKey。
2.就绪selectionKey集合
当执行select()方法时,以AbstractPollSelectorImpl为例,会执行到updateSelectedKeys()方法。
代码语言:javascript复制/**
* Copy the information in the pollfd structs into the opss
* of the corresponding Channels. Add the ready keys to the
* ready queue.
*/
protected int updateSelectedKeys() {
int numKeysUpdated = 0;
// Skip zeroth entry; it is for interrupts only
for (int i=channelOffset; i<totalChannels; i ) {
// 获取通道就绪操作类型(可读、可写、错误等)
int rOps = pollWrapper.getReventOps(i);
if (rOps != 0) {
SelectionKeyImpl sk = channelArray[i];
pollWrapper.putReventOps(i, 0);
if (selectedKeys.contains(sk)) {
// 将ReventOps就绪的操作类型转换到SelectionKeyImpl
if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
numKeysUpdated ;
}
} else {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk); // 注解@4
numKeysUpdated ;
}
}
}
}
return numKeysUpdated;
}
注解@3:由SelectorImpl方法可以看出publicSelectedKeys即为selectedKeys。
注解@4:将就绪的key放入了selectedKeys集合中。
小结:SelectorImpl中的publicSelectedKeys存放了就绪selectedKey。
3.轮询就绪selectionKey集合
代码语言:javascript复制private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() { // 注解@5
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
注解@5:顺着示例b.bind(PORT)进入到doBind0()方法。channel.eventLoop().execute(...),此处的eventLoop()即NioEventLoop。也就是后续的轮询事件在该NioEventLoop线程中进行。SingleThreadEventExecutor是NioEventLoop的父类,实际执行到SingleThreadEventExecutor的execute方法。
代码语言:javascript复制@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
代码语言:javascript复制private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task); // 注解@6
if (!inEventLoop) {
startThread(); // 注解@7
// ...
}
// ...
}
代码语言:javascript复制private void doStartThread() {
// ...
executor.execute(new Runnable() {
@Override
public void run() {
//...
SingleThreadEventExecutor.this.run(); // 注解@8
//...
});
}
注解@6:将Runnable放入Queue中。
注解@7:此处启动了一个线程默认为ThreadPerTaskExecutor。
注解@8:具体执行逻辑由NioEventLoop#run()来执行。
代码语言:javascript复制protected void run() {
int selectCnt = 0;
for (;;) { // 注解@9
// ...
try {
// ...
processSelectedKeys(); // @10
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks(); // @11
}
// ...
}
}
注解@9:一个死循环在不断轮询就绪事件
注解@10:处理就绪事件
注解@11:处理放入Queue中的Runnable任务
代码语言:javascript复制private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys()); // 注解@12
}
}
注解@12:selector.selectedKeys()即为SelectorImpl的publicSelectedKeys,即获取了就绪事件集合。
小结:就绪事件的轮询SingleThreadEventExecutor#run方法负责,不断轮询就绪事件集合publicSelectedKeys,来判断是否有就绪事件。
三、事件处理
代码语言:javascript复制private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// ...
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 注解@13
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 注解@14
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 注解@15
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
注解@13:处理连接事件
注解@14:处理可写事件
注解@15:处理可读事件和处理客户端连接事件
小结:在以上的事件处理中,bossGroup主要处理客户端连接OP_ACCEPT事件。当有新的客户端的连接时触发unsafe.read()执行。具体为NioMessageUnsafe#read()方法。
代码语言:javascript复制public void read() {
// ...
try {
try {
do {
int localRead = doReadMessages(readBuf); // 注解@16
// ...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); // 注解@17
}
// ...
} finally {
// ...
}
}
注解@16:见下面doReadMessages逻辑。
注解@17:pipeline.fireChannelRead触发了pipeline链表向下执行。
代码语言:javascript复制protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); // 注解@18
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch)); // 注解@19
return 1;
}
} catch (Throwable t) {
// ...
}
return 0;
}
代码语言:javascript复制protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
注解@18:获取新建立的连接通道SocketChannel。
注解@19:将连接通道SocketChannel转换为NioSocketChannel,建立时注册的OP_READ读事件。并将新建立的连接通道放入了List中。
再回到注解@17,pipeline.fireChannelRead触发了链表向下传递。具体传递到那个Handler中需要翻前面初始化代码。
代码语言:javascript复制void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) { // 注解@20
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor( // 注解21
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
注解@20:在Server初始化时回调
注解@21:Server初始化后在pipeline加入了ServerBootstrapAcceptor,同时注意参数currentChildGroup和currentChildHandler,分别对应示例中传入的childGroup和childHandler。
接着看下ServerBootstrapAcceptor的channelRead方法。
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; // 注解@22
child.pipeline().addLast(childHandler); // 注解@23
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() { // 注解@24
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
注解@22:注意此处有“注解@17”步回调回来,传入的msg为新建立连接的NioSocketChannel,再强调下该通道注册了可读事件。
注解@23:把childHandler加入到新建立连接Channel的pipeline,也就是示例中的HttpUploadServerInitializer。HttpUploadServerInitializer的初始化(channel注册后回调)又加入了HttpRequestDecoder、HttpResponseEncoder、HttpContentCompressor、HttpUploadServerHandler到该channel的pipeline。
代码语言:javascript复制public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpUploadServerHandler());
}
注解@24:将childGroup绑定到该NioSocketChannel,并将该NioSocketChannel注册到Selector。
小结:当轮询到有新的客户端连接建立时,将该新建的通道NioSocketChannel通过pipeline转发给ServerBootstrapAcceptor,这个过程由线程池bossGroup分配的线程负责。
当ServerBootstrapAcceptor收到新建立的通道NioSocketChannel时与workGroup分配的线程绑定,并将用户添加的childHandler加入到该channel的pipeline,注册OP_READ读事件到Selector。该过程由线程池workGroup分配的线程负责。
当有数据可读时由线程池workGroup分配的线程处理,并一路通过pipeline回调到我们自己加入的ChannelHandler 见注解@23.
例如:示例中加入的HttpUploadServerHandler,其父类为SimpleChannelInboundHandler。当监听到有可读事件时,通过pipeline一路回调到SimpleChannelInboundHandler的channelRead(),进而调用到HttpUploadServerHandler的channelRead0()方法,从而处理我们的逻辑。
作者丨梁勇 来源丨瓜农老梁 欢迎关注公众号「瓜农老梁」
「瓜农老梁 学习同行」