有道无术,术尚可求也!有术无道,止于术!
前面两节课,我们着重分析了 initAndRegister方法,对通讯通道的创建、初始化以及注册到选择器上有了一个详细的介绍,回想JDK NIO的开发步骤,我们需要获取SocketChaennel、获取选择器Selector、将通道注册到选择器、绑定端口、处理事件!那么同样的Netty是基于NIO开发的,也同样少不了这几个步骤,迄今为止,我们已经学习了,Selector的创建、SocketChannel的创建、选择器的注册,今天我们要学的就是通道的绑定端口!
一、源码入口
我们回到:io.netty.bootstrap.AbstractBootstrap#doBind 方法:
代码语言:javascript复制private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
.............................忽略..............................
if (regFuture.isDone()) {
........................忽略.........................
//进行数据绑定 通道的注册 以及事件的触发
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
........................忽略.........................
return promise;
}
}
这几行代码相信大家无比的熟悉,initAndRegister是做Channel的创建、初始化、注册的,我们分析完了,下面就是要分析绑定方法了!
话不多少,我们直接进入到doBind0方法里面!
代码语言:javascript复制private static void doBind0( final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
// 其channelRegistered()实现中的管道。
channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}
毋庸置疑,我们重点关注:
代码语言:javascript复制channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
继续跟进到bind方法:
image-20210430090259692
代码语言:javascript复制@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
//this.bind head
return pipeline.bind(localAddress, promise);
}
我们到这里看到了一行奇怪的代码,似乎调用了一个通道的传播,我们继续跟下去:
代码语言:javascript复制@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
大家都知道,tail节点是我们再通道中的最尾部节点,大家通过上节课的分析可知,现在的pipeline是如下结构:
image-20210430090942394
我们查看bind方法是 ChannelOutboundInvoker接口下的 ,回想我们分析Netty的整体架构图的时候,分析过ChannelOutboundInvoker是从后向前传播的,即从tail节点向前传播,最终到Head节点结束的,但是TailContext与ServerBootstrapAcceptor都未实现bind方法,那么我们最终把位置定位到HeadContext的代码上:(注意,这里不必知道,哎pipeline中是如何传播的,下面有一章节是对pipeline的添加、寻找、注册有一个完整的源码分析,这里为了同学们更好的理解,就先不涉及这么多了!)
我们进入到:io.netty.channel.DefaultChannelPipeline.HeadContext#bind
代码语言:javascript复制@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
image-20210430091901226
unsafe是NioMessageUnsafe类型的,父类是AbstractNioUnsafe,所以我们进入到AbstractNioUnsafe的源码:
二、源码解析
代码语言:javascript复制@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...............................忽略.................................
//false
boolean wasActive = isActive();
try {
//jdk底层的绑定端口 NioServerSocketChannel
doBind(localAddress);
} catch (Throwable t) {
...............................忽略.................................
return;
}
//isActive true
if (!wasActive && isActive()) {
//触发 Active事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
我们还是忽略到部分分支代码,看我们的主线代码,首先会判断通道是否是激活状态:
代码语言:javascript复制boolean wasActive = isActive();
此时,通道并没有绑定端口号,所以此时返回的是false。
代码语言:javascript复制doBind(localAddress);
开始调用JDK底层的逻辑进行通道的绑定,我们进入到doBind方法,你们一定要记好,我们初始化的是服务端,我们给的通道类型是NioServerSocketChannel
image-20210430092431289
代码语言:javascript复制@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//Netty 会根据 JDK 版本的不同,分别调用 JDK 底层不同的 bind() 方法。
// 我使用的是 JDK8,所以会调用 JDK 原生 Channel 的 bind() 方法。
// 执行完 doBind() 之后,服务端 JDK 原生的 Channel 真正已经完成端口绑定了。
if (PlatformDependent.javaVersion() >= 7) {
//jdk底层的绑定
javaChannel().bind(localAddress, config.getBacklog());
} else {
//jdk底层的绑定
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
Netty会根据JDK版本的不同注册的时候有些微的不一样,我们以JDK8为例,会执行if分支:
代码语言:javascript复制javaChannel().bind(localAddress, config.getBacklog());
这行代码,相信大家也是无比的熟悉,这就是JDK NIO的绑定端口的代码,我们回想下JDK NIO是如何绑定端口的:
image-20210430093046665
上图的JDK NIO的注册方式,两者代码是一致的!绑定完成后,我们回到主线代码:
代码语言:javascript复制if (!wasActive && isActive()) {
//触发 Active事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
wasActive属性是false,因为之前还没激活,取反为true,此时通道已经绑定成功了,重新调用isActive()
,返回为true,所以整体返回true,走该分支,我们暂且停一下,试想一下,这个判断的意义在哪里!
按照之前的分析,这个判断的逻辑是,绑定之前没有激活,绑定之后激活了,只有两个条件同时满足才会走这个分支,这能够保障该判断逻辑内的逻辑不会被重复调用,只会再绑定成功后调用一次!
我们进入到逻辑分支,该方法也是异步的,但是没关系,我们依旧按照同步的方式分析,有关异步,我会在下一节课完整的分析,在Netty中所有的异步都有一个相同的执行方式!
代码语言:javascript复制pipeline.fireChannelActive();
从定义上来看又是一个管道的事件传播,我们进入看一下,从什么地方开始传播的:
代码语言:javascript复制@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
我们可以看到是从Head节点开始传播的,
代码语言:javascript复制static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
这里无论是同步还是异步,都是调用了 next.invokeChannelActive(); 我们进入到源码逻辑:
代码语言:javascript复制private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
..............................忽略............................
}
}
..............................忽略............................
}
因为我们的handler是Head类型的,所以:
image-20210430102440554
代码语言:javascript复制@Override
public void channelActive(ChannelHandlerContext ctx) {
//传播事件
ctx.fireChannelActive();
//设置为读监听
readIfIsAutoRead();
}
一共两行代码,比较简单,第一行传播事件,从头结点往下寻找传播 ChannelActive
方法:
ctx.fireChannelActive();
有关事件的传播,我会在pipeline中详解,这里先记住,会传播一个事件,调用channelActive方法!
image-20210430102854323
因为ChannelActive是ChannelInboundHandler类的方法,Netty整体架构课分析过,ChannelInboundHandler属于正向传播,即从Head节点开始到Tail节点结束:
image-20210430090942394
代码语言:javascript复制//设置为读监听
readIfIsAutoRead();
大家回想一下,我们再注册NioServerSocketChannel的时候,关注的是0,即不关注任何事件,忘记的同学可以去上一节课注册的源码解析查看:
image-20210430104253365
但是按道理来说,以我们JDK NIO的基础,我们新服务器应该关注的是一个OP_ACCEPT
事件,所以,我们这里就要对他进行一个更改,让他关注新连接事件,我们进入到readIfIsAutoRead源码中:
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
判断的逻辑分支默认为true
image-20210430104641955
关于为什么选这个,已经前面讲了好几次,这里不做陈述,我们直接进入到read源码中:
代码语言:javascript复制@Override
public Channel read() {
pipeline.read();
return this;
}
很明显,又是一个事件传播,我们继续跟:
代码语言:javascript复制@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
很明显,该方法是从tail节点开始传播,Netty整体架构课上说过,read属于ChannelOutboundInvoker,属于倒序传播,该代码是从tail节点向上寻找,默认实现是HeadContext实现,我们进入到HeadContext:
注意,同学们有关事件传播如何传播的会很疑惑,先不要急,先按照我的逻辑走,后面学习完pipeline之后,你会对如何传播有一个及其清晰的认识,先按照我的逻辑走!
io.netty.channel.DefaultChannelPipeline.HeadContext#read
代码语言:javascript复制@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
我们进入到 unsafe.beginRead();
image-20210430105344298
代码语言:javascript复制@Override
public final void beginRead() {
....................忽略..................
try {
doBeginRead();
} catch (final Exception e) {
....................忽略..................
}
}
我们进入到 doBeginRead(); 方法中, 注意我们是服务端默认的Unsafe是 AbstractNioMessageChannel
类型的:
image-20210430105632098
代码语言:javascript复制@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
调用父类的doBeginRead方法:
io.netty.channel.nio.AbstractNioChannel#doBeginRead
代码语言:javascript复制@Override
protected void doBeginRead() throws Exception {
.............................忽略.............................
final int interestOps = selectionKey.interestOps();
//如果当前的读事件为0 且预设的事件不为0进入逻辑
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
大家还记得我们再创建NioServerSocketChannel的时候,保存的readInterestOp 是什么吗?我截图帮助大家回忆一下:
image-20210430110023427
那么if分支逻辑内就相当于:
代码语言:javascript复制selectionKey.interestOps(0 | SelectionKey.OP_ACCEPT);
然后,NioServerSokcetChannel的选择器就被绑定为关注连接事件了!
至此,服务端启动成功!!
三、总结
- 调用JDK原生的方法,给channel绑定一个端口!
- 传播channelActive事件,进行方法的回调!
- 修改NioServerSocketChannel选择器默认关注的事件从0变为SelectionKey.OP_ACCEPT,开始等待客户端新连接接入!
- 服务端启动成功!
才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!
代码语言:javascript复制