Netty学习三
前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。
首先,我们可以看Netty的架构图,图片来自即时通讯网:
reactor线程模型图,图片来自即时通讯网:
下面是跟踪源码的流程操作:
代码语言:javascript复制 AbstractBootstrap#bind(int inetPort)-> AbstractBootstrap# bind(SocketAddress localAddress)->
AbstractBootstrap#doBind(final SocketAddress localAddress)->AbstractBootstrap#doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) 注意这个方法,里面先进行绑定,然后添加监听->
ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)->
重要 AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)->
DefaultChannelPipeline#bind(SocketAddress localAddress, ChannelPromise promise)->
AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)->
AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)
——>ChannelOutboundInvoker#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
进入LoggingHandler的操作
->重要,调用LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#ctx.bind(localAddress, promise)
-> AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)#next.invokeBind(localAddress, promise)
->AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)#bind(this, localAddress, promise)
->ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)
->重要 DefaultChannelPipeline#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#unsafe.bind(localAddress, promise)
-> 重要 AbstractChannel#bind(final SocketAddress localAddress, final ChannelPromise promise)
->javaChannel()->NioServerSocketChannel#doBind(SocketAddress localAddress)
->AbstractChannel#invokeLater(Runnable task)# eventLoop().execute(task)
->SingleThreadEventExecutor#execute(Runnable task)
#execute(Runnable task, boolean immediate)#addTask(Runnable task)#wakeup(inEventLoop)#safeSetSuccess(promise)
->AbstractChannel#safeSetSuccess(ChannelPromise promise)
->此时会将结果往回抛AbstractChannelHandlerContext->LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
->DefaultChannelPromise#addListener(GenericFutureListener<? extends Future<? super Void>> listener)
->DefaultPromise#addListener(GenericFutureListener<? extends Future<? super V>> listener)#isDone()#notifyListeners()
#executor()
->AbstractBootstrap#executor()#super.executor()
->DefaultChannelPromise#channel().eventLoop()
->AbstractNioChannel#eventLoop()
->AbstractChannel#eventLoop()
->notifyListenersNow()
->FastThreadLocalRunnable#run()
->ThreadExecutorMap#apply(final Runnable command, final EventExecutor eventExecutor)#command.run()
->SingleThreadEventExecutor#doStartThread()#SingleThreadEventExecutor.this.run()
->重要 NioEventLoop#run()#runAllTasks(long timeoutNanos)#runTasks #task = pollTask()#pollTaskFrom(Queue<Runnable> taskQueue)
->NioEventLoop#safeExecute(Runnable task)#fireChannelActive() 重要#invokeChannelActive()的((ChannelInboundHandler) handler()).channelActive(this);
跟踪完之后,回忆一下,Netty的操作,首先启动服务,此时已经启动了服务,还需要建立连接。
进行绑定操作的重要步骤:
代码语言:javascript复制@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket "
"is not bound to a wildcard address; binding to a non-wildcard "
"address (" localAddress ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//重要,进行active操作
pipeline.fireChannelActive();
}
});
}
//关键地方,此时会一步一步的返回pipeline操作
safeSetSuccess(promise);
}
此时我们可以看到EventLoop是一个重要的类,我们的大部分操作都是在NioEventLoop中完成的操作。EventLoop的作用是一个死循环,而这个循环做了下面几件事:
有条件的等待Nio事件
处理Nio事件
处理消息队列中的任务
代码语言:javascript复制@Override
protected void run() {
int selectCnt = 0;
//自旋操作
for (;;) {
try {
int strategy;
try {
//计算策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
//匹配策略
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt ;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//运行task 重要
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
//运行AllTasks操作
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
接收请求,进行多路复用的key
代码语言:javascript复制NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)#unsafe.read()->AbstractNioMessageChannel#read()
-> NioServerSocketChannel#doReadMessages(List<Object> buf)#
SocketChannel ch = SocketUtils.accept(javaChannel())#SocketUtils#accept(final ServerSocketChannel serverSocketChannel)#
ServerSocketChannel#accept()->sun公司的ServerSocketChannelImpl#accept(),此方法完成连接操作
->关注doReadMessages(List<Object> buf)中的pipeline.fireChannelRead(readBuf.get(i))方法,此方法完成了read操作
最终,会调用到AbstractNioChannel的doBeginRead方法。
下面我们来看一下,首先我们在网页中输入http://localhost:8007/,会看到客户端发出请求,断掉会进入到unsafe.read()操作:
代码语言:javascript复制private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//重要
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
进入到,这个方法是重要的:
代码语言:javascript复制private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
进而进入:
代码语言:javascript复制@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
public SocketChannel accept() throws IOException {
Object var1 = this.lock;
synchronized(this.lock) {
if (!this.isOpen()) {
throw new ClosedChannelException();
} else if (!this.isBound()) {
throw new NotYetBoundException();
} else {
SocketChannelImpl var2 = null;
int var3 = 0;
FileDescriptor var4 = new FileDescriptor();
InetSocketAddress[] var5 = new InetSocketAddress[1];
InetSocketAddress var6;
try {
this.begin();
if (!this.isOpen()) {
var6 = null;
return var6;
}
this.thread = NativeThread.current();
do {
var3 = this.accept(this.fd, var4, var5);
} while(var3 == -3 && this.isOpen());
} finally {
this.thread = 0L;
this.end(var3 > 0);
assert IOStatus.check(var3);
}
if (var3 < 1) {
return null;
} else {
IOUtil.configureBlocking(var4, true);
var6 = var5[0];
var2 = new SocketChannelImpl(this.provider(), var4, var6);
SecurityManager var7 = System.getSecurityManager();
if (var7 != null) {
try {
var7.checkAccept(var6.getAddress().getHostAddress(), var6.getPort());
} catch (SecurityException var13) {
var2.close();
throw var13;
}
}
return var2;
}
}
}
}
此时会调用jdk的SocketChannel,最终调用sun公司的ServerSocketChannel的accept操作。
接着来分析
代码语言:javascript复制 for (int i = 0; i < size; i ) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
会调用:
代码语言:javascript复制static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
调用ChannelRead:
代码语言:javascript复制@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//执行register操作
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
进入AbstactChannel的register:
代码语言:javascript复制@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
进而进入我们需要看的doBeginRead:
代码语言:javascript复制private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//进行注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
//重要
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
进入doBeginRead:
代码语言:javascript复制@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
进入doBeginRead方法:
代码语言:javascript复制@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//完成感兴趣操作
selectionKey.interestOps(interestOps | readInterestOp);
}
}
可以在jdk的SelectionKeyImpl看到:
代码语言:javascript复制public SelectionKey interestOps(int var1) {
this.ensureValid();
return this.nioInterestOps(var1);
}
此时完成了doBeginRead操作。
也即首先启动关注bind操作,完成启动之后,进行accept操作,然后进行read操作。