NioServerSocketChannel的绑定源码解析

2021-07-16 14:09:02 浏览数 (1)

有道无术,术尚可求也!有术无道,止于术!

前面两节课,我们着重分析了 initAndRegister方法,对通讯通道的创建、初始化以及注册到选择器上有了一个详细的介绍,回想JDK NIO的开发步骤,我们需要获取SocketChaennel、获取选择器Selector、将通道注册到选择器、绑定端口、处理事件!那么同样的Netty是基于NIO开发的,也同样少不了这几个步骤,迄今为止,我们已经学习了,Selector的创建、SocketChannel的创建、选择器的注册,今天我们要学的就是通道的绑定端口!

一、源码入口

我们回到:io.netty.bootstrap.AbstractBootstrap#doBind 方法:

代码语言:javascript复制
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
	.............................忽略..............................
    if (regFuture.isDone()) {
        ........................忽略.........................
        //进行数据绑定  通道的注册 以及事件的触发
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ........................忽略.........................
        return promise;
    }
}

这几行代码相信大家无比的熟悉,initAndRegister是做Channel的创建、初始化、注册的,我们分析完了,下面就是要分析绑定方法了!

话不多少,我们直接进入到doBind0方法里面!

代码语言:javascript复制
private static void doBind0( final ChannelFuture regFuture, final Channel channel, 
                            final SocketAddress localAddress, final ChannelPromise promise) {

        // 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
        // 其channelRegistered()实现中的管道。
        channel.eventLoop().execute(() -> {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        });
    }

毋庸置疑,我们重点关注:

代码语言:javascript复制
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

继续跟进到bind方法:

image-20210430090259692

代码语言:javascript复制
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    //this.bind head
    return pipeline.bind(localAddress, promise);
}

我们到这里看到了一行奇怪的代码,似乎调用了一个通道的传播,我们继续跟下去:

代码语言:javascript复制
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

大家都知道,tail节点是我们再通道中的最尾部节点,大家通过上节课的分析可知,现在的pipeline是如下结构:

image-20210430090942394

我们查看bind方法是 ChannelOutboundInvoker接口下的 ,回想我们分析Netty的整体架构图的时候,分析过ChannelOutboundInvoker是从后向前传播的,即从tail节点向前传播,最终到Head节点结束的,但是TailContext与ServerBootstrapAcceptor都未实现bind方法,那么我们最终把位置定位到HeadContext的代码上:(注意,这里不必知道,哎pipeline中是如何传播的,下面有一章节是对pipeline的添加、寻找、注册有一个完整的源码分析,这里为了同学们更好的理解,就先不涉及这么多了!)

我们进入到:io.netty.channel.DefaultChannelPipeline.HeadContext#bind

代码语言:javascript复制
@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}

image-20210430091901226

unsafe是NioMessageUnsafe类型的,父类是AbstractNioUnsafe,所以我们进入到AbstractNioUnsafe的源码:

二、源码解析

代码语言:javascript复制
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
   ...............................忽略.................................
    //false
    boolean wasActive = isActive();
    try {
        //jdk底层的绑定端口   NioServerSocketChannel
        doBind(localAddress);
    } catch (Throwable t) {
        ...............................忽略.................................
        return;
    }
    //isActive true
    if (!wasActive && isActive()) {
        //触发 Active事件
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

我们还是忽略到部分分支代码,看我们的主线代码,首先会判断通道是否是激活状态:

代码语言:javascript复制
boolean wasActive = isActive();

此时,通道并没有绑定端口号,所以此时返回的是false。

代码语言:javascript复制
doBind(localAddress);

开始调用JDK底层的逻辑进行通道的绑定,我们进入到doBind方法,你们一定要记好,我们初始化的是服务端,我们给的通道类型是NioServerSocketChannel

image-20210430092431289

代码语言:javascript复制
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    //Netty 会根据 JDK 版本的不同,分别调用 JDK 底层不同的 bind() 方法。
    // 我使用的是 JDK8,所以会调用 JDK 原生 Channel 的 bind() 方法。
    // 执行完 doBind() 之后,服务端 JDK 原生的 Channel 真正已经完成端口绑定了。
    if (PlatformDependent.javaVersion() >= 7) {
        //jdk底层的绑定
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        //jdk底层的绑定
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

Netty会根据JDK版本的不同注册的时候有些微的不一样,我们以JDK8为例,会执行if分支:

代码语言:javascript复制
javaChannel().bind(localAddress, config.getBacklog());

这行代码,相信大家也是无比的熟悉,这就是JDK NIO的绑定端口的代码,我们回想下JDK NIO是如何绑定端口的:

image-20210430093046665

上图的JDK NIO的注册方式,两者代码是一致的!绑定完成后,我们回到主线代码:

代码语言:javascript复制
if (!wasActive && isActive()) {
    //触发 Active事件
    invokeLater(new Runnable() {
        @Override
        public void run() {
            pipeline.fireChannelActive();
        }
    });
}

wasActive属性是false,因为之前还没激活,取反为true,此时通道已经绑定成功了,重新调用isActive(),返回为true,所以整体返回true,走该分支,我们暂且停一下,试想一下,这个判断的意义在哪里!

按照之前的分析,这个判断的逻辑是,绑定之前没有激活,绑定之后激活了,只有两个条件同时满足才会走这个分支,这能够保障该判断逻辑内的逻辑不会被重复调用,只会再绑定成功后调用一次!

我们进入到逻辑分支,该方法也是异步的,但是没关系,我们依旧按照同步的方式分析,有关异步,我会在下一节课完整的分析,在Netty中所有的异步都有一个相同的执行方式!

代码语言:javascript复制
pipeline.fireChannelActive();

从定义上来看又是一个管道的事件传播,我们进入看一下,从什么地方开始传播的:

代码语言:javascript复制
@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

我们可以看到是从Head节点开始传播的,

代码语言:javascript复制
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

这里无论是同步还是异步,都是调用了 next.invokeChannelActive(); 我们进入到源码逻辑:

代码语言:javascript复制
private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            ..............................忽略............................
        }
    } 
    ..............................忽略............................
}

因为我们的handler是Head类型的,所以:

image-20210430102440554

代码语言:javascript复制
@Override
public void channelActive(ChannelHandlerContext ctx) {
    //传播事件
    ctx.fireChannelActive();
    //设置为读监听
    readIfIsAutoRead();
}

一共两行代码,比较简单,第一行传播事件,从头结点往下寻找传播 ChannelActive方法:

代码语言:javascript复制
ctx.fireChannelActive();

有关事件的传播,我会在pipeline中详解,这里先记住,会传播一个事件,调用channelActive方法!

image-20210430102854323

因为ChannelActive是ChannelInboundHandler类的方法,Netty整体架构课分析过,ChannelInboundHandler属于正向传播,即从Head节点开始到Tail节点结束:

image-20210430090942394

代码语言:javascript复制
//设置为读监听
readIfIsAutoRead();

大家回想一下,我们再注册NioServerSocketChannel的时候,关注的是0,即不关注任何事件,忘记的同学可以去上一节课注册的源码解析查看:

image-20210430104253365

但是按道理来说,以我们JDK NIO的基础,我们新服务器应该关注的是一个OP_ACCEPT事件,所以,我们这里就要对他进行一个更改,让他关注新连接事件,我们进入到readIfIsAutoRead源码中:

代码语言:javascript复制
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

判断的逻辑分支默认为true

image-20210430104641955

关于为什么选这个,已经前面讲了好几次,这里不做陈述,我们直接进入到read源码中:

代码语言:javascript复制
@Override
public Channel read() {
    pipeline.read();
    return this;
}

很明显,又是一个事件传播,我们继续跟:

代码语言:javascript复制
@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

很明显,该方法是从tail节点开始传播,Netty整体架构课上说过,read属于ChannelOutboundInvoker,属于倒序传播,该代码是从tail节点向上寻找,默认实现是HeadContext实现,我们进入到HeadContext:

注意,同学们有关事件传播如何传播的会很疑惑,先不要急,先按照我的逻辑走,后面学习完pipeline之后,你会对如何传播有一个及其清晰的认识,先按照我的逻辑走!

io.netty.channel.DefaultChannelPipeline.HeadContext#read

代码语言:javascript复制
@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

我们进入到 unsafe.beginRead();

image-20210430105344298

代码语言:javascript复制
@Override
public final void beginRead() {
   ....................忽略..................
    try {
        doBeginRead();
    } catch (final Exception e) {
        ....................忽略..................
    }
}

我们进入到 doBeginRead(); 方法中, 注意我们是服务端默认的Unsafe是 AbstractNioMessageChannel类型的:

image-20210430105632098

代码语言:javascript复制
@Override
protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }
    super.doBeginRead();
}

调用父类的doBeginRead方法:

io.netty.channel.nio.AbstractNioChannel#doBeginRead

代码语言:javascript复制
@Override
protected void doBeginRead() throws Exception {
    .............................忽略.............................
    final int interestOps = selectionKey.interestOps();
    //如果当前的读事件为0 且预设的事件不为0进入逻辑
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

大家还记得我们再创建NioServerSocketChannel的时候,保存的readInterestOp 是什么吗?我截图帮助大家回忆一下:

image-20210430110023427

那么if分支逻辑内就相当于:

代码语言:javascript复制
selectionKey.interestOps(0 | SelectionKey.OP_ACCEPT);

然后,NioServerSokcetChannel的选择器就被绑定为关注连接事件了!

至此,服务端启动成功!!

三、总结

  1. 调用JDK原生的方法,给channel绑定一个端口!
  2. 传播channelActive事件,进行方法的回调!
  3. 修改NioServerSocketChannel选择器默认关注的事件从0变为SelectionKey.OP_ACCEPT,开始等待客户端新连接接入!
  4. 服务端启动成功!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!

代码语言:javascript复制

0 人点赞