Netty源码分析
一.reator模型
netty是reator模型的实现,我们先来看下reator模型
单线程reator
我用一个现实生活的例子来解释上面的图,我相信你一定能看懂
假如你新开了一家火锅店,由于前期资金比较短缺,你没有足够的资金去请店员,只有你和你老婆两个人(夫妻店),你为了让你老婆轻松一点,你让你老婆啥事不用做,只负责听顾客的需求,然后转发给你做(其实你老婆在这里就是充当selector)。简化一下你需要做哪些事情
1.接待顾客
2.为顾客提供服务
3.其他的工作,比如打扫卫生啥的
那正常的工作流程如下:
你老婆坐在门口,当看到有人进店,她立马吆喝让你去接待顾客,然后你把顾客安排到了一个桌子旁边。此时可以理解为一个连接已经建立了,然后你跟顾客说你有啥吩咐直接跟我老婆讲,因为你老婆会将顾客的需求告诉你,然后你处理。
然后顾客点菜,点好了告诉你老婆点好菜了,可以准备锅底和菜了,这时你老婆就会将顾客提的需求转给你,比如:1号桌已经点好菜了,你去准备下吧,此时你就将锅底、菜什么的都给顾客送过去。这样不知不觉的你就完成了一次客户端和服务端的一次请求
接下来…
新顾客来了…你老婆让你接待
1号桌需要加水…你老婆让你处理
…
还有一些比如定时任务需要处理,比如晚上收工后还要打扫卫生等
上面的场景我们就可以理解成单线程的reator模型,什么事情都需要你一个人去做,如果你正在忙着给1号桌加水,此时来了新顾客,你肯定无法及时处理…,我们发现这样你只能开个小店,生意火爆了,你就处理不过来。我们需要演变成下面的下面的多线程模型
多线程reator
由于你的辛勤劳动,你们辛苦的赚了两年钱,你想把店开大一点,多赚点钱,这是肯定靠你一个人是不行的,于是你请了几个服务员,假设你请了A,B,CD,E五个服务员,你的安排如下:
A,B专门负责接待;C,D,E负责处理顾客的需求,此时你是老板,你啥也不用干了,haha
那正常的运行流程应该是这样的
你老婆的工作内容不变,依然是充当selector的作用
当新顾客来了,你老婆就通知A,B两人中的一个人,让她负责接待(比如:小A来客人了,你接待一下),然后A就把顾客带到餐桌,此时我们可以理解为一个连接已经建立了(客户端和服务端建立连接),A的事情做完了,她回到原来的位置等待接待其他的新顾客
然后1号桌子菜点完了,然后告诉你老婆说我菜点完了,这时你老婆就从C,D,E三个里面安排一个去给他上菜、上锅(这里要注意了,netty以后会着一个人处理这个桌子的所有需求了,避免上下文切换,比如你老婆让C处理了这个上菜,上火锅的需求,那么以后这个桌子要加水什么的都是这个C去处理,一个C可以处理多个桌子的需求,比如处理1-10号桌子)
然后新的顾客来了,你老婆通知B处理…
11号桌子要加水,你老婆通知D处理…
…
上面的这个流程可以帮我们理解多线程reator模型
A,B就是acceptor线程里的线程
C,D,E就是nio处理IO和业务的线程
从此你的火锅店越来越大…
二.netty线程模型
上面的例子中简单的描述过,当一个餐桌(已经有顾客的餐桌–连接)被分配一个服务员后,后面这个餐桌的所有的需求都是这个服务员操作。换句话来说在netty中当客户端和服务端建立连接后就会分配一个EventLoop(暂时可以理解为与一个线程绑定),后续这个channel(建立的连接)的所有的操作都是这个eventLoop中的线程执行—这样能避免线程的上下文切换带来的性能的消耗以及消息的顺序性处理等,后面我们会在源码中详细看到这一点。
三.channel、pipeline、context、handler的关系
channel、pipeline、context、handler的关系如下图
这个关系图我们可以从下面的源码中看出来
代码语言:javascript复制//AbstractChannel是NioSocketChannel和NioServerSocketChannel的父类
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
代码语言:javascript复制protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//默认会给我们添加head和tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
四.netty源码分析
netty的源码比较对称,客户端和服务端大同小异,这里仅分析服务端的主流程代码,netty的版本:4.1.45.Final
从实例的代码入手,其实就是从ServerBootstrap入手
先贴一段客户端和服务端netty的使用代码,都是类似
服务端:
代码语言:javascript复制EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
final RegisterCentorHandler registerCentorHandler = new RegisterCentorHandler();
serverBootstrap.group(boss,worker)
.option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.SO_REUSEADDR,true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast("frameEncoder",
new LengthFieldPrepender(2));
/*反序列化,将字节数组转换为消息实体*/
ch.pipeline().addLast("MessageDecoder",new KryoDecoder());
/*序列化,将消息实体转换为字节数组准备进行网络传输*/
ch.pipeline().addLast("MessageEncoder",
new KryoEncoder());
//将反序列化后的实体类交给业务处理
ch.pipeline().addLast(registerCentorHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(Configuration.REGISTER_CENTOR_PORT);
channelFuture.syncUninterruptibly();
客户端:
代码语言:javascript复制EventLoopGroup boss = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
final DiscoveryHandler discoveryHandler = new DiscoveryHandler();
bootstrap.group(boss)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast("frameEncoder",
new LengthFieldPrepender(2));
/*反序列化,将字节数组转换为消息实体*/
ch.pipeline().addLast("MessageDecoder",new KryoDecoder());
/*序列化,将消息实体转换为字节数组准备进行网络传输*/
ch.pipeline().addLast("MessageEncoder", new KryoEncoder());
//从注册中心获取服务
ch.pipeline().addLast(discoveryHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(Configuration.HOST, Configuration.REGISTER_CENTOR_PORT);
boolean ret = channelFuture.awaitUninterruptibly(3000, TimeUnit.MINUTES);
开始分析
1.ServerBootstrap#group()
代码语言:javascript复制public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
这个代码没什么好说的了,就是设置变量,包括后面的ServerBootstrap.handler().childHandler()等等,大家记住这里会将传进去的对象存储起来
我们直接从服务端的bind方法开始,这才是重点
代码语言:javascript复制public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
//直接看这个方法
return doBind(localAddress);
}
绑定ip地址和端口,网络编程都是这样的套路,上面SocketAddress上面已经设置了
代码语言:javascript复制private ChannelFuture doBind(final SocketAddress localAddress) {
//1.重点方法
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//2.重点方法
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//设置回调,当channel初始化和注册成功,做哪些操作,失败又做哪些操作,暂时不是研究的重点
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
重点分析两个方法
initAndRegister():初始化channel并且注册
doBind0():绑定
2.initAndRegister()
代码语言:javascript复制final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1.创建channel
channel = channelFactory.newChannel();
//2.初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//3.注册
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
(1)创建channel
服务端设置channel的代码
serverBootstrap.channel(NioServerSocketChannel.class),我们看下是怎么创建channel的
代码语言:javascript复制public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
//....省略了部分调用过程
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
//到这里来,还是给这个channelFactory赋值
this.channelFactory = channelFactory;
return self();
}
我们回到initAndRegister方法注释1
代码语言:javascript复制@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " constructor.getDeclaringClass(), t);
}
}
可以看到其实就是通过反射创建channel,记住这里服务端创建的channel是NioServerSocketChannel
我们接着往下面看注释2的初始化方法
(2)init(channel)
这个方法是模板设计模式,父类是AbstractBootstrap,两个子类分别是ServerBootstrap和Bootstrap分别用于处理服务端和客户端,我这里看到了就顺便跟大家提一下,所以这里我们直接看ServerBoostrap#init()
代码语言:javascript复制//这里传过来的channel正是我们刚才上面创建的NioServerSocketChannel
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
//1.一个channel对应一个pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//2.往pipeline中添加handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
//往pipeline上面添加handler
pipeline.addLast(handler);
}
//3.非常重要,稍后会详细讲解
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
上面这个方法就比较重要了,不过也不难,听我慢慢分析
(1).channel和pipeline
channel,pipeline,context,handler之间的关系,我们在之前已经详细的看过了,
一个channel里面有一个pipeline流水线
一个pipeline上面用双线链表连接了很多handler,但是handler被context包装起来了,所以pipeline上面连接的是context
当有事件需要处理时就会在区分入站和出站事件,在pipeline上的handler上面处理
这里需要注意一下,此时的channel是NioServerSocketChannel这个是专门用来建立连接的,后面连接后会再次建立一个channel(NioSocketChannel)希望大家这里要注意,防止被后面的handler和childHandler搞迷糊
NioServerSocketChannel设置的处理器handler是通过serverBootstrap.handler()设置的
NioSocketChannel用于处理IO的channel的handler是通过serverBootstrap.childHandler()设置
只有服务端需要区分,因为服务端有两种不同的channel;客户端不需要,客户端只有NioSocketChannel
(2)增加handler
pipeline.addLast(),ChannelInitializer是用于我们辅助增加handler的抽象类,它在某个时机会调用到里面的initChannel()方法,这个时机就是当channel注册完成后(这一块我们先跳过,后面我们再讲),我们现在假设channel已经注册完成了,会调用到initChannel(),我们来看里面的逻辑
1).将我们通过serverBootstrap.handler()设置的handler加到pipeline中
我们可以根据自己的业务逻辑判断是否需要增加用于连接的channel上的handler
2).增加一个非常重要的handler:ServerBootstrapAcceptor
我们重点看增加的的这个ServerBootstrapAcceptor处理器,当有客户端连接时,这个handler会处理,我们看看它是怎么处理的
另外这个为什么是提交任务的方式执行,而不是直接调用呢(后面会分析)
对于服务端来说,连接事件是一个入站处理器,所以我们看channelRead()方法
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) {
//1.上游创建的channel是NioSocketChannel
final Channel child = (Channel) msg;
//2.NioSocketChannel对应的pipelie设置childHandler
child.pipeline().addLast(childHandler);
//设置其他属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
//channel注册
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
如果调用到这个方法,说明有客户端发起connect事件到服务端,服务端的ServerBootstrapAcceptor这个handler专门用于处理客户端连接的业务逻辑,处理的方式就是获取子Channel(NioSocketChannel),这个channel会在上游建立好(后面再讲),然后设置子channel处理的handler以及参数等,并且完成注册,注册的方式和父Channel(NioServerSocketChannel)一样,下面说道注册的时候再讲。
(2)注册
上面我们看到了NioServerSocketChannel的初始化,现在我们来看注册config().group().register(channel);
group()返回的NioEventLoopGroup
代码语言:javascript复制@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next()方法返回的是一个EventLoop(NioEventLoop),这里我们就要回想上面我们说的线程模型了,一个channel会被一个EventLoop终生处理,这个next()就是从EventLoopGroup选择一个EventLoop,大家可自行查看,接下来我们看register()方法
为了便于理解,我们先看下NioEventLoop的类关系图
我们重点看NioEventLoop是有单线程池的功能,先有个印象吧
代码语言:javascript复制public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
代码语言:javascript复制public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
接着会调用到AbstractChannel内部类AbstractUnsafe的register()方法,unsafe后面有时间再分析,先理解它是涉及到底层操作
代码语言:javascript复制public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " eventLoop.getClass().getName()));
return;
}
//1.讲channel和eventLoop绑定,一个channel有一个eventloop终身处理事件
AbstractChannel.this.eventLoop = eventLoop;
//2.下面这个if else比较重要,下面会详细分析,这也是netty线程模型
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
上面代码完成了channel和eventLoop的绑定;另外,一个eventLoop里面是有一个线程的,eventLoop.inEventLoop()这个方法就是判断当前线程是不是这个channel对应的eventloop的线程,如果是则直接注册,否则将它放到eventloop的同步队列中,稍后有eventloop的线程执行。为什么这么做呢?就是我们上面分析的netty的线程模型,一个channel永远有一个thread执行,而这个thread和eventloop的绑定的,避免上下文切换带来的性能损耗,以及编码的复杂性,还有事件处理的顺序性。
接着我们继续看注册的逻辑
代码语言:javascript复制private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//1.注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//2.有点重要,会触发ChannelInitializer.channelRegistered()方法
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
//3.向pipeline中传递channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
//4.注册read感兴趣的事件
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
先继续看注册
代码语言:javascript复制protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
这里就是将对应的java的channel注册到seletor上,但是这里并没有设置感兴趣的事情(0表示对任何事情不感兴趣),我们来看看它在哪里注册了accept感兴趣的事件的,我们回到上面的注释4的那个代码beginRead()
代码语言:javascript复制public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
//
}
}
代码语言:javascript复制protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
//其实就是在这里注册的,重点是这readInterestOp,它是实例变量
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
首先这个方法是在AbstractNioChannel,具体的类是NioServerSocketChannel,我们来看这个类的构造方法
代码语言:javascript复制public NioServerSocketChannel(ServerSocketChannel channel) {
//就是在这个地方将readInterestOp赋值SelectionKey.OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
到这里我相信大家都看明白了…
pipeline.invokeHandlerAddedIfNeeded()最终会调用到ChannelInitializer.handlerAdded()
代码语言:javascript复制public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
这里刚好和我们前面呼应,我们前面说在某个时机会调用到ChannelInitializer.init()方法,正是这个地方。
后面的active()和read()方法都会想pipeline中的handler一次传递
现在还一下上面欠下的债,我们从上面的代码经常可以看到,通过eventloop.execute(new Runnable())…这种方式执行的,我们来看下是如何执行的
首先来到NioEventLoop#execute()
代码语言:javascript复制public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
代码语言:javascript复制private void execute(Runnable task, boolean immediate) {
//判断是不是eventloop的内部的线程
boolean inEventLoop = inEventLoop();
//往同步队列中加入队列,为什么要用同步队列?大家思考下,不是只有一个线程执行么?
addTask(task);
if (!inEventLoop) {
//开启线程执行
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
代码语言:javascript复制private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//重点看这个方法
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
代码语言:javascript复制private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//重点看这个方法
SingleThreadEventExecutor.this.run();
success = true;
}
//省略了很多的代码.....
}
});
}
最终会执行SingleThreadEventExecutor#run(),这个方法非常重要,如果知道原生的NIO编程的话,看这一段也很简单,这其实就是任务开始的地方
代码语言:javascript复制@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//1.查看是否有任务,有任务就得到需要处理任务的策略,没有任务就调用select策略,其实就是后面调用select()阻塞监听,和nio一样
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
//没有任务
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//调用select方法
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
//省略代码....
}
//下面是处理IO事件(也就是selector监听到的),以及处理其他任务的调度了,感兴趣的自己看吧,因为只有一个线程处理所有的这些事情
selectCnt ;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
// Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
最后来看下processSelectedKey()这个方法
代码语言:javascript复制private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
上面这个方法就不多解释了,跟我们写原生的NIO差不多
五.netty开发注意事项
netty框架已经帮我们封装好了,所以我们使用起来是非常简单的,客户端和服务端都是固定的写法,我们如果想使用一个netty框架开发,我们其实大部分只需要编写自己的handler,那我们在写handler时需要注意什么呢?
1.切记要释放内存
当我们编写handler时,我们需要释放内存(bytebuf)。不然会内存泄漏。但是netty的tailHandler会帮我们自动释放内存
所以我们的做法
(1)要么让事件一直传递到tailHandler
我们可以简单使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter当我们编写完了自己的业务逻辑后,可以调用super.read()让其传到tailHandler
也可以使用SimpleChannelInboundHandler这个类的channelRead的finally会调用ReferenceCountUtil.release()释放内存,我们只需要实现channelRead0()方法
(2)要么自己手动释放
netty已经提供好了一个工具类,让我们调用释放
ReferenceCountUtil.release()
2.handler截断
当我们不需要将事件往后面的handler传递的时候,我们需要注意下面两个事情
(1)如果我们使用的是ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter就不用调用他的super方法
(2)一定要记得手动释放内存
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/111194.html原文链接:https://javaforall.cn