bossGroup和workGroup是分开的,bossGroup负责accept请求,而workGroup负责read/write事件,bossGroup accept之后转交给workGroup具体是怎么实现的呢。
BossGroup
NioEventLoop#processSelectedKey()方法中如下List-1,当bossGroup收到accept事件后,调用unsafe.read()
List-1
代码语言:javascript复制// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
AbstractNioMessageChannel.NioMessageUnsafe.read(),如下List-2,1处获取JDK channel,2处pipeline.fireChannelRead(readBuf.get(i))会调用ServerBootstrapAcceptor.channelRead()。
List-2
代码语言:javascript复制private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//1
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ) {
readPending = false;
//2
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
NioServerSocketChannel.doReadMessages()如下获取JDK channel,之后封装后NioSocketChannel,放入到buf中
List-3
代码语言:javascript复制@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
bossGroup的pipeline,是head->ServerBootstrapAcceptor->tail,ServerBootstrapAcceptor.channelRead()中,childGroup.register(child),这里的child是netty封装后的JDK channel,转交给childGroup,如下List-5
List-4
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
WorkGroup
MultithreadEventLoopGroup,如下所示的方法中,next()从众多的child group中选出一个来处理这个新的channel,默认是轮询。
List-5
代码语言:javascript复制@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
List-5中register()只是在这个channel上注册read事件监听,待这个channel上有read事件,即有数据可读时就会进行read。
List-6中bootstrap设置了childHandler,initializer里对pipeline添加了channelHandler,workGroup从channel中读取数据后,会顺着channel handdler链表进行处理,也就是我们设置的这些channel handler会收到对应的数据。
List-6
代码语言:javascript复制NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
...
.childHandler(new ServerInitializer(...));
...
public class TrpcServerInitializer extends ChannelInitializer<SocketChannel> {
private Map<String, Object> handlerMap;
private List<TrpcFilter> filters;
public TrpcServerInitializer(Map<String, Object> handlerMap, List<TrpcFilter> filters) {
this.handlerMap = handlerMap;
this.filters = filters;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(30, 30, PingPongRequest.BEAT_INTERVAL, TimeUnit.SECONDS));
pipeline.addLast(new TrpcDecoder(TrpcRequest.class));
pipeline.addLast(new TrpcEncoder(TrpcResponse.class));
pipeline.addLast(new TrpcServerHandler(handlerMap, filters));
}
workGroup从bossGroup获取channel,之后注册read事件,之后如List-1,如果有read事件,则会调用List-2中的方法,1处从JDK channel中获取byte数据写入ByteBBuf,2处pipeline.fireChannelRead(),将读取的Byte数据转交给我们自定义的channelHandler,比如ByteToMessageDecoder中就可以进行反序列化byte,之后的业务ChannelHandler才能处理业务请求
List-7
代码语言:javascript复制protected class NioByteUnsafe extends AbstractNioUnsafe {
...
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//1
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//2
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();