前面我们已经深入分析Netty中的核心组件,接下来我们开始来深入理解Netty各个组件处理事件的运作流程,通过事件流程的分析,我们可以思考Netty框架是如何设计组件之间的协作来配合完成基于Reactor模式且具备可伸缩性的Web服务,由于Netty事件流程比较多且杂,上篇主要分析事件轮询器初始化,启动类初始化组件以及服务端的端口绑定事件.
事件轮询分析
EventLoopGroup初始化流程
- 入口程序代码
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- NioEventLoopGroup类图结构
- EventLoopGroup初始化源码
// NioEventLoopGroup构造器
// NioEventLoopGroup.java
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 默认使用阻塞式轮询策略
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); // channel处理不过来的时候直接丢弃
}
// MultithreadEventLoopGroup.java
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 默认线程数量为CPU*2 或者是通过 io.netty.eventLoopThreads进行配置
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 创建默认事件执行选择器(从Group中选择一个EventLoop来处理Channel的策略选择器)
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// 初始化Group的核心方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
}
// 上述可以看到初始化创建默认具备线程池的一些默认策略(线程大小/线程工厂/存储任务队列/丢弃策略)/创建默认的事件轮询选择器/默认的IO复用器提供者
- EventLoopGroup初始化的核心流程
根据上述可以知道,EventLoopGroup初始化的操作主要是初始化一组EventLoop的执行器,并创建选举EventLoop的选择器,并为每个EventLoop在销毁的时候添加监听器以便于程序能够获取当前EventLoop销毁情况,同时每个EventLoop对外提供服务都是只读模式,也就是选举EventLoop都是处于只读的稳定版本.
EventLoop的初始化流程
- EventLoop的创建流程包含在上述EventLoopGroup为每个执行器(EventLoop)进行初始化的过程,即在源代码中如下:
// MultithreadEventExecutorGroup.java
// 初始化执行器
children[i] = newChild(executor, args);
// newChild的实现子类NioEventLoopGroup
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
// NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
- EventLoop的初始化流程
- 基于上述的认知,我们来总结下EventLoopGroup,EventLoop,EventExecutor以及Thread之间的关系,首先先从源码开始分析如下:
// MultithreadEventExecutorGroup.java
// ThreadPerTaskExecutor看成线程池 - 对应默认的线程工厂类
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
// 有多少个线程就有多少个EventLoop
children = new EventExecutor[nThreads];
// 用线程池创建EventLoop
children[i] = newChild(executor, args);
// SingleThreadEventExecutor.java
// this为NioEventLoop
this.executor = ThreadExecutorMap.apply(executor, this);
// ThreadExecutorMap.java
// 创建新的执行器
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
// check not null ...
return new Executor() {
@Override
public void execute(final Runnable command) {
// ThreadPerTaskExecutor.execute -> apply
// 启动一个线程执行任务并传递事件轮询器
// FastThreadLocalThread.run()
executor.execute(apply(command, eventExecutor));
}
};
}
// 新任务Task
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 将EventLoop存储到FastThreadLocal(即保证FastThreadLocalThread独占持有自己的EventLoop)
setCurrentEventExecutor(eventExecutor);
try {
// 执行任务
command.run();
} finally {
// 任务执行完之后释放独占EventLoop的资源
setCurrentEventExecutor(null);
}
}
};
}
private static void setCurrentEventExecutor(EventExecutor executor) {
// 使用FastThreadLocal来存储事件轮询器,保证每个事件轮询器都会有对应的一个线程来处理
mappings.set(executor);
}
// 通过上述的流程可知,在每个EventLoop都含有一个新的Executor
// 而每一个Executor都通过默认的线程工厂创建一个FastThreadLocalThread线程来处理task任务
// 此时的Task任务为一个新的任务task
- 通过源码分析,可以得到以下简要的EventLoopGroup,Group下的线程池Executor,EventLoop与EventLoop下的Executor以及Thread之间的关系如下:
通过上述示意图可知,每个EventLoop处理任务时都会通过Group下的Executor来创建对应的线程来执行EventLoop的事件任务,并且为了保证并发安全问题,在每次处理任务之前,将会把当前的EventLoop与Thread进行绑定,也就是当前EventLoop为当前执行的线程Thread所独占持有,通过FastThreadLocal来维护两者之间的关系,一旦EventLoop事件任务处理完成之后,将解除两者的绑定.同时也可以看到处理一组事件任务的Thread将通过线程组的方式进行维护和管理.
Netty线程模型细化
可以看到上述一个EventLoop绑定一个专有的线程,由专有的线程负责处理EventLoop的事件,且一个channel都会对应着一个EventLoop来负责处理channel相关的事件,同时一个EventLoop/Thread能够处理多个Channel需要依赖于AIO或者是NIO的API才能实现,AbstractBootstrap处理服务端Channel,ServerBootstrap处理客户端Channel,而对于BIO模型而言,只能一个EventLoop/Thread处理对应一个Channel,即摘录《Netty实战》关于NIO/OIO(old IO,BIO)模型如下:
- 基于NIO/AIO的线程模型
- 基于BIO的线程模型(OIO为old IO,即使用BIO的API)
- EventLoop启动任务的执行源码
// 调用以下的方法时执行流程
// SingleThreadEventExecutor.java
eventLoop.execute(task);
// 这里的线程执行流程不弄清楚,后面的事件流程将很理解
// 根据类设计可知,execute为SingleThreadEventExecutor下的方法,结合上面的EventLoop初始化流程可知,每个EventLoop都拥有一个内置的Executor,而这个Executor用于创建FastThreadLocalThread线程来保证当前eventloop与当前线程之间的绑定关联,源码如下:
private void execute(Runnable task, boolean immediate) {
// 判断当前执行的线程是否与eventloop对应(EventLoop - Thread绑定一起)
boolean inEventLoop = inEventLoop();
// 将任务添加到队列中,如果队列满则丢弃当前任务
addTask(task);
if (!inEventLoop) {
// 启动一个线程,如果当前EventLoop持有的线程已经开启过则直接跳过,如果开启过线程,则执行doStartThread方法
startThread();
// ...
}
private void doStartThread(){
// EventLoop持有的executor来创建一个FastThreadLocalThread线程,在该线程中保证当前事件轮询器与线程处于线程安全,通过FastThreadLocal将线程与EventLoop进行关联
executor.execute(new Runnable() {
//....
});
}
- 结合EventLoop初始化对应的executor以及ThreadExecutorMap中的源码,现将一个不在当前线程的EventLoop提交任务时创建一个完整线程执行细节流程绘制如下:
也就是说,最终处理任务task都在NioEventLoop执行的run方法中体现,或者更为严格意义上来取决于我们选择的EventLoop的IO操作模式,具体是交由EventLoop的IO操作模式的run方法通过队列中获取任务来进行处理,于是根据源码中提供的任务队列与拒绝策略,对于EventLoop处理任务的流程如下(摘录自《Netty实战》):
- 与线程池不一样的是,EventLoop是与指定的线程绑定在一起,也就是一个线程处理一个EventLoop,并且在整个Web服务中EventLoop始终是由当前的专有线程负责事件的任务处理
- 当添加任务到EventLoop执行的时候,需要校验当前的线程是不是持有之前分配好的EventLoop,如果不是那么就添加到任务队列进行等待EventLoop下一次处理事件时再执行,如果队列满了,那么此时就会触发拒绝策略丢弃任务,如果是之前分配好的EventLoop那么就会直接执行任务Task.
Netty之NIO事件轮询流程
基于上述的线程任务流程分析之后,我们知道在EventLoop中最终会调用NioEventLoop下的run方法,对此,现该run方法执行的事件轮询操作流程进行分析.
- 事件轮询源码
// NioEventLoop.run()核心代码
for(;;){
// 检测当前的EventLoop的队列中是否有任务
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
// 根据服务器配置eventloop的IO处理能力比率
if (ioRatio == 100) {
// 如果IO处理比率高,则同时处理就绪事件以及当前轮询器队列中的所有任务
// 不然就分开处理
try {
if (strategy > 0) {
// 处理一系列的就绪事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 执行所有的任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
// 处理就绪事件,处理ACCEPT/READ/WRITE事件
processSelectedKeys();
} finally {
// 在一定事件内处理队列中的任务
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 处理任务
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
}
// NioEventLoop的unsafe为NioMessageUnsafe
processSelectedKeys(){
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
// runAllTasks
runAllTasks(){
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 在当前EventLoop所在的线程执行run方法
// task.run();
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
- 事件轮询流程图
至此,关于EventLoop的事件流程分析结束,接下来我们来看启动类添加组件并完成初始化具体事件流程.
启动类初始化组件分析
SeverBootstrap初始化流程
- 入口程序代码
ServerBootstrap bootstrap = new ServerBootstrap();
- 启动类的类图结构设计
- Bootstrap初始化源码
// 在上述执行初始化流程中,会在内部完成以下组件的初始化
// ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
// The order in which child ChannelOptions are applied is important they may depend on each other for validation
// purposes.
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
// ServerBootstrapConfig.java
public final class ServerBootstrapConfig extends AbstractBootstrapConfig<ServerBootstrap, ServerChannel> {
ServerBootstrapConfig(ServerBootstrap bootstrap) {
super(bootstrap);
}
}
// AbstractBootstrapConfig.java
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
// 存储为子类的server bootstrap,即上述的ServerBootstrap
protected final B bootstrap;
protected AbstractBootstrapConfig(B bootstrap) {
this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
}
}
对此,结合之前组件分析,我们知道Channel是存在语义上的层次关系,我们关注ServerBootstrap与ServerBootstrapConfig, AbstractBootstrap与AbstractBootstrapConfig之间在语义层次上分别获取channel信息的区分,其类图组件如下:
EventLoopGroup添加到启动类
- 入口程序代码
bootsrtap.group(bossGroup, workerGroup);
group()
事件处理源代码
// ServerBootsrtap.java
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
// AbstractBootstrap.java
// AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>>
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
通过上述可知并结合多Reactor模式可知:
- ServerBootstrap持有childGroup,用于处理socketChannel的读写事件
- AbstractBootstrap持有parentGroup,用于处理serverChannel的accept事件
服务端Channel添加到启动类
- 入口程序
// 创建一个服务端的ServerChannel并指定其BACKLOG大小为100
bootsrtap.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100);
channel()
事件处理的源代码
// AbstractBootstrap.java
// 通过传递的服务端Channel构造一个Channel创建工厂类,用于后续构建服务端的Channel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// 服务端Channel的配置存储到容器Map中
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
根据上述源码可知,启动类调用channel()
方法目的是创建一个ChannelFactory工厂类,用于后续构建服务端的Channel实例,我们可以看到Netty框架此处使用工厂模式来创建Channel,目的是为了支持创建不同服务端类型的Channel而避免使用new Class()
的方式硬编码逐一分别实现,有助不同Channel类型的扩展.
将Handler添加到启动类
- 入口程序
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
handler()
源码
// 源码
// AbstractBootstrap.java
// 当前的服务端channelHandler存在于AbstractBootstrap
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
通过上述可知,在启动类传递的handler将保存到AbstractBootstrap
类中,该handler()
方法主要用于处理服务端channel的事件完成结果的处理,对于服务端而言,主要处理监听客户端连接完成的事件处理
将childHandler添加到启动类
- 入口程序
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
// 保证每一个socket channel都会对应着一个自己的channel handler
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
childHandler()
源码
// 源码
// ServerBootstrap.java
// 将上述的childHandler绑定到ServerBootstrap,为ServerBootstrap所持有
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
class ServerBootstrapAcceptor{
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
}
}
在启动类添加的childHandler
最终保存到当前的ServerBootstrap
类中,主要用于处理客户端SocketChannel连接的读写事件,在后续事件分析中,我们可以知道添加当前的childHandler将会在Acceptor中注册客户端channel时会将对应的childHandler添加到channel的责任链pipeline中,先前在组件源码分析中已经说明到,ServerChannel是作为客户端SocketChannel的语义层次上的父类,于是对于handler我们也可以理解childHandler是处理客户端读写事件的handler的处理器.
启动类初始化组件小结
通过上述的类图可以知道,ServerBootsrtap与SocketChannel进行关联,AbstractServerBootstrap与ServerSocketChannel进行关联,对于channel,ServerSocketChannel与SocketChannel是层次上的父子关系,对于Bootsrap类抑或是Config类,均通过子类获取与SocketChannel相关的信息,通过父类获取与ServerSocketChannel相关信息,层次划分明确,现将Bootstrap构造初始化操作事件流程绘制如下:
我们知道在Netty框架在处理服务端与客户端的事件是划分层次的,在语义层次上,服务端属于“父类”,客户端属于“子类”,两者之间的事件所依赖的组件也在语义上划分层次,对此,结合上述对EventLoopGroup与EventLoop的源码分析,现将启动类Bootstrap,EventLoopGroup,EventLoop,Channel以及Thread之间的关联示意图绘制如下:
端口绑定事件分析
启动类绑定端口分析
- 入口程序代码
// 入口程序
bootsrtap.bind(PORT);
bind()
核心源码摘录
// AbstractBootstrap.java
// 通过类名称可知是创建服务端的Channel并注册Channel事件实现对客户端Channel连接的监听
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 由于注册绑定流程复杂,这里将绑定注册流程划分出来,摘录核心方法,Netty框架中使用EventLoop来处理每个channel事件,存在多线程异步执行的情况.对于异步返回的结果ChannelFuture已在Netty组件源码分析说明到,这里不再详述
// bind包括: 创建channel -> 初始化channel -> 注册channel -> channel绑定端口操作
doBind(final SocketAddress localAddress){
// 初始化并注册服务端的channel
initAndRegister();
//...
//如果注册成功,执行服务端channel的绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
服务端channel创建事件
- 创建服务端channel的流程
// 源代码
// 使用channelFactory创建NioServerSocketChannel实例
channel = channelFactory.newChannel();
NioServerSocketChannel
类图结构设计
NioServerSocketChannel
初始化源码
public NioServerSocketChannel() {
// 创建java的nio下的ServerSocketChannel并传递到当前的NioServerSocketChannel构造器中
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 服务端监听Accept事件并保存,后续在进行注册的时候将会使用到OP_ACCEPT
// 1.设置channel的父类,如果当前为服务端的channel则为null
// 2.创建channelId
// 3.创建Nio的Unsafe类
// 4.创建channel的责任链pipeline,同时每个pipeline都会创建一个双端链表连接上下文对象
super(null, channel, SelectionKey.OP_ACCEPT);
// 1. 为当前的channel创建接收数据的ByteBuff分配器,即AdaptiveRecvByteBufAllocator,该分配器默认从1024kb开始创建缓冲区分配数据,最小为64kb,最多不超过65536kb
// 2. 保存java对象的ServerSocketChannel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
根据上述源码可知,创建Channel时会将与操作Channel相关的组件也一起完成初始化操作,即创建操作缓冲区数据的Unsafe以及对缓冲区数据进行读写存储的ByteBuf分配器.
服务端channel的初始化流程
- 初始化源码入口
// AbstractBootstrap调用init方法,但是当前类没有实现,交由子类ServerBootstrap去执行
init(channel);
- 初始化核心代码
// ServerBootstrap.java
init(){
// 1. 为当前的channel设置option以及attributes
// 2. 获取当前channel的责任链,为当前的责任添加初始化handler处理器
// init下初始化handler核心代码
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 获取服务端channel的handler处理类
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 在channel所在的eventloop创建一个线程来执行任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 在任务下为服务端的channel添加Acceptor处理器负责处理客户端channel连接进来的事件完成处理
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
通过上述初始化之后,服务端当前责任链pipeline结构为:head -> initChannelHandler -> tail
服务端channel的注册流程
- 源码入口
// 源码
// AbstractBootstrap.java
// 获取boss NioEventLoopGroup,将channel注册到当前的group下
ChannelFuture regFuture = config().group().register(channel);
- 源代码分析
// 根据NioEventLoopGroup的继承类图,可知register方法是在MultithreadEventLoopGroup下
// MultithreadEventLoopGroup.java
public ChannelFuture register(Channel channel) {
// 选举一个EventLoop来注册channel
// 在初始化Group操作的时候已经完成选择器的初始化操作,这里调用选择器来选择一个EventLoop
// 这里调用EventLoop的注册方法,在上述入口中使用NioEventLoop可知使用的register方法为SingleThreadEventLoop类下的方法,最终调用AbstractChannel下的register方法
// 方法调用走向如下:
// MultithreadEventLoopGroup.regitser() -> SingleThreadEventLoop.regitser() -> promise.channel().unsafe().register() -> unsafe(NioMessageUnsafe).regitser() -> AbstractNioUnsafe.regitser() -> AbstractUnsafe.register() -> AbstractUnsafe.register0()
return next().register(channel);
}
//AbstractChannel.java下的AbstractUnsafe
register0(promise);
// 上述注册方法的核心步骤:
// 1. 将channel注册到复用器selector上
// 2. 注册完成之后唤醒回调责任链下所有先前已加入的channelHandler类下的handlerAdd方法
// 3. 注册完成之后将结果设置在promise中
// 4. 将注册结果传递到责任链pipeline中,并执行回调channelHandler(ChannelInboundHandler)类下的channelRegistered方法,链式回调执行
// 5. 如果channel为active状态,则继续传播结果事件到channelHandler(ChannelInboundHandler)类下的channelActive方法,链式回调执行
// 6. 5步骤是在第一次进行注册的时候会执行(表示channel已经打开),如果已经注册过,那么校验会自动开始数据读取操作,客户端channel注册读取OP_READ操作, 对应服务端的Channel而言就是监听客户端socket的连接ACCEPT事件
初始化的时候,责任链为head -> initChannelHandler -> tail
当完成上述注册流程的时候,执行入站事件,会依次调用责任链,此时责任链最终为head -> handler -> acceptor -> tail
,也就是当channel完成注册的时候,才会将启动类中的handler添加到对应channel的pipeline中,否则就不申请内存创建channel对象,类似于懒加载处理,只在完成channel注册的时候(表示当前channel是要进行事件的监听操作)才会初始化一个完整的pipeline下handler,对此,注册前后的pipeline如下:
最后,在基于上述的线程执行任务细节基础之上,将服务端的初始化并注册流程示意图流程绘制如下:
执行端口绑定与监听操作
- 端口绑定源码分析
// 上述channel注册成功之后,这个时候在上面流程只会触发Active事件,这个时候没有绑定端口没有触发监听事件
// AbstractBootstrap.java
doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
// channel所在的eventloop线程执行任务
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 注册成功将channel进行绑定操作
if (regFuture.isSuccess()) {
//
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
// AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
// DefaultChannelPipeline.java
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 在链表尾部添加绑定操作
return tail.bind(localAddress, promise);
}
// AbstractChannelHandlerContext.java
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 搜索outboundContext上下文
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 执行责任链pipeline 出站事件,从链表尾部开始搜索,因而最后的context是headContext
// 执行headContext下的invokeBind方法,该方法还是属于当前类,对此查看下文
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
// AbstractChannelHandlerContext.java
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
// headContext的绑定方法
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
// unsafe为NioMessageUnsafe,执行该类下的bind方法(AbstractUnsafe.java中定义)
// 最后再执行channel下的doBind(localAddress);方法,即NioServerSocketChannel下的方法
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// 可以看到实现了端口的绑定操作
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
对此,基于上述源码的分析,我们绘制服务端channel的端口绑定流程如下: