深入分析netty(一)BootStrap与ServerBootStrap

2022-10-25 16:54:56 浏览数 (3)

文章目录
  • 1.揭开BootStrap神秘面纱
    • 1.1.客户端BootStrap
      • 1.1.1 NioSocketChannel 的初始化过程
      • 1.1.2 ChannelFactory 和Channel类型的确定
      • 1.1.3Channel的实例化
      • 1.1.4关于unsafe字段的初始化
      • 1.1.5关于pi peline的初始化
      • 1.1.6关于EventLoop的初始化
      • 1.1.7Channel的注册过程
      • 1.1.8 Handler的添加过程
      • 1.1.9客户端连接分析
    • 1.2.服务端ServerBootStrap
      • 1.2.1 Channel的初始化过程
      • 1.2.2Channel类型的确定
      • 1.2. 3 NioServerSocketChannel的实例化过程
      • 1.2. 4 ChannelPipeline 初始化
      • 1.2.5 Channel的注册
      • 1.2.6关于bossGroup 与workerGroup
      • 1.2.6 Handler 的添加过程

1.揭开BootStrap神秘面纱

1.1.客户端BootStrap

Bootstrap是Netty提供的一个便利的工厂类,可以通过它来完成客户端或服务端的Netty初始化。

先来看一个例子,从客户端和服务端分别分析Netty的程序是如何启动的。首先,从客户端的代码片段开始。

代码语言:javascript复制
import com.xiepanpan.chat.client.handler.ChatClientHandler;
import com.xiepanpan.chat.protocol.IMDecoder;
import com.xiepanpan.chat.protocol.IMEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author: xiepanpan
 * @Date: 2020/8/16 0016
 * @Description: 聊天客户端
 */
public class ChatClient {

    private ChatClientHandler chatClientHandler;
    private String host;
    private int port;

    public ChatClient(String nickName) {
        this.chatClientHandler = new ChatClientHandler(nickName);
    }

    public static void main(String[] args) {
        new ChatClient("xp").connect("127.0.0.1",80);
    }

    private void connect(String host, int port) {
        this.host = host;
        this.port = port;

        EventLoopGroup workerGroup = new NioEventLoopGroup();;

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new IMDecoder());
                    socketChannel.pipeline().addLast(new IMEncoder());
                    socketChannel.pipeline().addLast(chatClientHandler);
                }
            });
            ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

上面的客户端代码虽然简单,但是展示了Netty客户端初始化时所需的所有内容。

(1)EventLoopGroup:不论是服务端还是客户端,都必须指定EventLoopGroup。在这个例子中,指定了NioEventLoopGroup,表示一个NIO的EventLoopGroup。

(2)ChannelType:指定Channel的类型。因为是客户端,所以使用了NioSocketChannel。

(3)Handler:设置处理数据的Handler。

下面继续看一下客户端启动Bootstrap后都做了哪些工作?

1.1.1 NioSocketChannel 的初始化过程

在Netty中,Channel 是一个Socket 的抽象,它为用户提供了关于Socket状态(是否是连 接还是断开)以及对Socket 的读写等操作.每当Netty 建立了一个连接后,都会有一个对应 的Channel实例. Ni oSocketChannel的类层次结构如下:

![image-20200817164624884]

接下来我们着重分析一下Channel的初始化过程。

1.1.2 ChannelFactory 和Channel类型的确定

除了TCP协议以外,Netty 还支持很多其他的连接协议,并且每种协议还有NI0(非阻塞I0) 和0I0(0ld-I0, 即传统的阻塞I0)版本的区别.不同协议不同的阻塞类型的连接都有不同的Channel类型与之对应下面是一些常用的Channel 类型: NioSocketChannel,代表异步的客户端TCP Socket 连接. NioServerSocketChannel,异步的服务器端TCP Socket 连接. NioDatagramChannel,异步的UDP连接 NioSctpChannel,异步的客户端Sctp连接. NioSctpServerChannel,异步的Sctp 服务器端连接. 0ioSocketChannel,同步的客户端TCP Socket连接. 0ioServerSocketChannel,同步的服务器端TCP Socket 连接. 0ioDatagramChannel,同步的UDP连接 0ioSctpChannel,同步的Sctp服务器端连接. 0ioSctpServerChannel,同步的客户端TCP Socket 连接. 那么我们是如何设置所需要的Channel的类型的呢?答案是channel() 方法的调用. 回想一下我们在客户端连接代码的初始化Bootstrap 中,会调用channel() 方法,传入NioSocketChannel. class,这个方法其实就是初始化了一个ReflectiveChannelFactory:

代码语言:javascript复制
	public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }	

而ReflectiveChannelFactory 实现了ChanneIFactory 接口,它提供了唯一的方法,即newChannel. ChannelFactory, 顾名思义,就是产生Channel的工厂类. 进入到ReflectiveChanne lFactory. newChannel中,我们看到其实现代码如下:

代码语言:javascript复制
@Override
public T newChannel() {
    try {
        return clazz.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class "   clazz, t);
    }
}

根据上面代码的提示,我们就可以确定: 1、Bootstrap 中的ChannelFactory 的实现是ReflectiveChannelFactory 2、生成的Channel 的具体类型是NioSocketChannel.

Channel的实例化过程,其实就是调用的. ChannelFactory. newChannel方法,而实例化的Channel的具体的类型又是和在初始化Bootstrap 时传入的channel() 方法的参数相关.因此对于我们这个例子中的客户端的Bootstrap而言,生成的的Channel 实例就是NioSocketChannel.

1.1.3Channel的实例化

前面我们已经知道了如何确定一个Channel 的类型,并且了解到Channel 是通过工厂方法ChannelFactory. newChannel()来实例化的,那么ChannelFactory. newChannel()方法在哪里调用呢?继续跟踪,我们发现其调用链是: Bootstrap. connect->Bootstrap . doResolveAndConnect->AbstractBootstrap.initAndRegister 在AbstractBootstrap. initAndRegister中调用了channelFactory () . newChannel()来获取一个新的NioSocketChannel实例,其源码如下:

代码语言:javascript复制
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).sync();

进入connect方法 doResolveAndConnect initAndRegister

代码语言:javascript复制
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

在.newChannel中,通过类对象的newInstance来获取一个新Channel 实例,因而会调用NioSocketChannel的默认构造器. NioSocketChannel默认构造器代码如下:

代码语言:javascript复制
public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

这里的代码比较关键,我们看到,在这个构造器中,会调用newSocket 来打开一个新的Java NioSocketChannel

代码语言:javascript复制
public NioSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}
代码语言:javascript复制
private static SocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

进入this方法

代码语言:javascript复制
public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}
代码语言:javascript复制
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

接着调用父类,即AbstractNioByteChannel的构造器

代码语言:javascript复制
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

并传入参数parent 为null, ch为刚才使用newSocket 创建的Java NI0 SocketChannel,因此生成的NioSocketChannel 的parent channel是空的. 接着会继续调用父类AbstractNioChannel的构造器,并传入了参数readInterest0p =SelectionKey.OP_READ

代码语言:javascript复制
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

然后调用父类的AbstractChannel的构造器

代码语言:javascript复制
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

到这里,一个完整的NioSocketChannel 就初始化完成了,我们可以稍微总结一下构造一个NioSocketChannel所需要做的工作: 1、调用NioSocketChannel. newSocket (DEFAULT_ SELECTOR_ PROVIDER) 打开一个新的Java NI0 SocketChannel 2、AbstractChannel (Channel parent) 中 初始化AbstractChannel 的属性:parent属性置为null unsafe通过newUnsafe() 实例化一个unsafe对象,它的类型是AbstractNioByteChannel. NioByteUnsafe内部类 pipeline是new Defaul tChannelPipeline(this)新创建的实例.这里体现了:Each channel has its own pipeline and it is created automatically when a new channel is created.

3、AbstractNioChannel 中的属性: SelectableChannel ch设置为Java SocketChannel,即NioSocketChannel. newSocket返回的Java NIO SocketChannel. readInterestOp被设置为Belect ionKey. OP_ READ SelectableChannel ch 被配置为非阻塞的ch. configureBlocking(false) 4、NioSocketChannel中的属性: SocketChanne lConfig config=new NioSocketChannelConfig(this,socket. socket ())

1.1.4关于unsafe字段的初始化

我们简单地提到了,在实例化NioSocketChannel的过程中,会在父类AbstractChannel的构造器中,调用newUnsafe()来获取一个unsafe实例.那么unsafe是怎么初始化的呢?它的作用是什么? 其实unsafe特别关键,它封装了对Java 底层Socket 的操作,因此实际上是沟通Netty 上层和Java底层的重要的桥梁. Unsafe的方法其实都会对应到相关的Java 底层的Socket的操作. 回到AbstraptChannel 的构造方法中,在这里调用了newUnsafe() 获取一个新的unsafe对象,而newUnsafe 方法在NioSocketChannel中被重写了:

代码语言:javascript复制
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioByteUnsafe();
}

NioSocketChannel. newUnsafe方法会返回一个NioSocketChannelUnsafe实例.从这里我们就可以确定了,在实例化的NioSocketChannel 中的unsafe字段,其实是一个NioSocketChannelUnsafe的实例.

1.1.5关于pi peline的初始化

上面我们分析了一个Channel (在这 个例子中是NioSocketChannel) 的大体初始化过程,但是我们漏掉了一个关键的部分,即ChannelPipeline 的初始化. 根据Each channel has its own pipeline and it is created automatically when a new channel is created.,我们知道,在实例化一个Channel 时,必然伴随着实例化一个ChannelPipeline. 而我们确实在AbstractChannel 的构造器看到了pipeline字段被初始化为DefaultChannelPipeline 的实例. 那么我们就来看一下,Defaul tChannelPipeline构造器做了哪些工作吧:

代码语言:javascript复制
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

我们调用Defaul tChannelPipeline的构造器,传入了一个channel, 而这个channel 其实就是我们实例化的NioSocketChannel, Defaul tChannelPipeline会将这个NioSocketChannel对象保存在channel字段中. DefaultChannelPipeline 中,还有两个特殊的字段,即head和tail, 而这两个字段是一个双向链表的头和尾.其实在Defaul tChannelPipeline中, 维护了一个以AbstractChanne lHandlerContext为节点的双向链表,这个链表是Netty 实现Pipeline 机制的关键.关于Defaul tChannelPipeline中的双向链表以及它所起的作用,这一节我们暂时不做详细分析。 先看看HeadContext的继承层次结构如下所示:

TailContext的继承层次结构如下所示:

我们可以看到,链表中head是一个 Channe lOutboundHandler,而tail 则是一个Channel InboundHandler. 接着看一下HeadContext的构造器:

代码语言:javascript复制
HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound = false,outbound = true. Tai lContext的构造器与HeadContext的相反,它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound = true, outbound = false. 即header是一个outboundHandler, 而tail 是一个inboundHandler,关于这一点,大家要特别注意,因为在分析到Netty Pipeline 时,我们会反复用到inbound 和outbound 这两个属性.

1.1.6关于EventLoop的初始化

回到最开始的ChatClient. java代码中,我们在一开始 就实例化了一个NioEventLoopGroup对象,因此我们就从它的构造器中追踪一下EventLoop 的初始化过程. 首先来看一下NioEventLoopGroup的类继承层次:

NioEventLoop有几个重载的构造器,不过内容都没有什么大的区别,最终都是调用的父类Multi threadEventLoopGroup构造器:

代码语言:javascript复制
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

其中有一点有意思的地方是,如果我们传入的线程数nThreads 是0,那么Netty 会为我们设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而这个默认的线程数是怎么确定的呢? 其实很简单,在静态代码块中,会首先确定DEFAULT_EVENT_LOOP_THREADS 的值:

代码语言:javascript复制
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

Netty会首先从系统属性中获取"io. netty. eventLoopThreads"的值,如果我们没有设置它的话,那么就返回默认值:处理器核心数* 2. 回到MultithreadEventLoopGroup 构造器中,这个构造器会继续调用父类MultithreadEventExecutorGroup的构造器:

代码语言:javascript复制
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i   ) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j   ) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j   ) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

根据代码,我们就很清楚MultithreadEventExecutorGroup中的处理逻辑了: 1、创建一个大小为nThreads 的SingleThreadEventExecutor 数组 2、根据nThreads 的大小,创建不同的Chooser, 即如果nThreads是2的幂,则使用Power0fTwoEventExecutorChooser,反之使用GenericEventExecutorChooser. 不论使用哪个Chooser,它们的功能都是一样的,即从children 数组中选出一个合适的EventExecutor 实例. 3、调用newChhild 方法初始化children 数组. 根据上面的代码,我们知道,MultithreadEventExecutorGroup 内部维护了一个EventExecutor数组,Netty 的EventLoopGroup 的实现机制其实就建立在Multi threadEventExecutorGroup之上.每当Netty 需要一个EventLoop 时,会调用next()方法获取一个可用的EventLoop.

上面代码的最后一部分是newChild 方法,这个是一个抽象方法,它的任务是实例化EventLoop对象.我们跟踪一下它的代码,可以发现,这个方法在NioEventLoopGroup类中实现了,其内容很简单:

代码语言:javascript复制
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

其实就是实例化一个NioEventLoop对象,然后返回它. 最后总结一下整个EventLoopGroup 的初始化过程吧: 1、EventLoopGroup(其实是Multi threadEventExecutorGroup) 内部维护一个类型为EventExecutor children 数组,其大小是nThreads, 这样就构成了一个线程池 2、如果我们在实例化NioEventLoopGroup 时,如果指定线程池大小,则nThreads就是指定的值,反之是处理器核心数* 2 3、MultithreadEventExecutorGroup 中会调用newChild 抽象方法来初始化children数组 4、抽象方法newChild 是在NioEventLoopGroup 中实现的,它返回一个NioEventLoop实例. 5、NioEventLoop 属性: SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过Se l ectorProvider. provider()获取一个SelectorProvider Selector selector 属性: NioEventLoop 构造器中通过调用通过selector =provider. openSelector()获取一个selector 对象.

1.1.7Channel的注册过程

在前面的分析中,我们提到,channel 会在Bootstrap. ini tAndRegister中进行初始化,但是这个方法还会将初始化好的Channel 注册到EventGroup 中,接下来我们就来分析一下Channel注册的过程. 回顾一下AbstractBootstrap. ini tAndRegister方法:

代码语言:javascript复制
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

当Channel初始化后,会紧接着调用group.register() 方法来注册Channel, 我们继续跟踪的话,会发现其调用链如下: AbstractBootstrap. ini tAndRegi ster->Multi threadEventLoopGroup. register->SingleThreadEventLoop. register- > AbstractChanne I$AbstractUnsafe. register

通过跟踪调用链,最终我们发现是调用到了unsafe 的register 方法,那么接下来我们就仔细看一下AbstractChannel $AbstractUnsafe. register方法中到底做了什么:

首先,将eventLoop赋值给Channel 的eventLoop 属性,而我们知道这个eventLoop对象其实是MultithreadEventLoopGroup. next()方法获取的,根据我们前面的小节中,我们可以确定next() 方法返回的eventLoop 对象是NioEventLoop实例.

register方法接着调用了register0 方法:

代码语言:javascript复制
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

register0又调用了AbstractNioChannel. doRegister: .

代码语言:javascript复制
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

javaChannel()这个方法在前面我们已经知道了,它返回的是一个Java NIO SocketChannel,这里我们将这个SocketChannel注册到与eventLoop 关联的selector 上了. 我们总结一下Channel 的注册过程: 1、首先在AbstractBootstrap. ini tAndRegister 中,通过group (). regi ster (channel),调用MultithreadEventLoopGroup. register方法 2、在MultithreadEventLoopGroup. register 中,通过next() 获取一个可用的SingleThreadEventLoop,然后调用它的register 3、在SingleThreadEventLoop. register 中,通过channel. unsafe (). register (this,promise)来获取channel 的unsafe() 底层操作对象,然后调用它的register. 4、在AbstractUnsafe. register方法中,调用register0 方法注册Channel 5、在AbstractUnsafe. register0中,调用AbstractNi oChanne l. doRegister方法 6、AbstractNi oChannel. doRegister方法通过javaChannel().register(eventLoop().selector, 0, this) 将Channol对应的Iava NIO.SockerChannel注册到一个eventLoop 的Selector 中,并且将当前Channel 作为attachment. 总的来说,Channel 注册过程所做的工作就是将Channel与对应的EventLoop 关联,因此这也体现了,在Netty 中,每个Channel 都会关联一个特定的EventLoop, 并且这个Channel中的所有I0操作都是在这个EventLoop 中执行的;当关联好Channel 和EventLoop 后,会 继续调用底层的Java NI0 SocketChannel 的register方法,将底层的Java NI0 SocketChannel注册到指定的selector 中.通过这两步,就完成了Netty Channel 的注册过程.

1.1.8 Handler的添加过程

Netty的一个强大和灵活之处就是基于Pipeline 的自定义handler机制.基于此,我们可以像添加插件一样自由组合各种各样的handler 来完成业务逻辑.例如我们需要处理HTTP数据,那么就可以在pipeline 前添加一个Http的编解码的Handler, 然后接着添加我们自己的业务逻辑的handler, 这样网络上的数据流就向通过一个管道一样,从不同的handler 中流过并进行编解码,最终在到达我们自定义的handler中. 既然说到这里,有些同学肯定会好奇,既然这个pipeline 机制是这么的强大,那么它是怎么实现的呢?在这里我还不打算详细讲解,在这一小节中,我们从简单的入手,展示一下我们自定义的handler是如何以及何时添加到ChannelPipeline中的.

代码语言:javascript复制
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new IMDecoder());
        socketChannel.pipeline().addLast(new IMEncoder());
        socketChannel.pipeline().addLast(chatClientHandler);
    }
});

这个代码片段就是实现了handler 的添加功能.我们看 到,Bootstrap. handler方法接收一个ChannelHandler,而我们传递的是一个派生于ChannelInitializer的匿名类,它正好也实现了ChannelHandler 接口.我们来看一下,ChannelInitializer类内到底有什么玄机:

ChannelInitializer是一个抽象类,它有一个抽象的方法initChannel,我们正是实现了这个方法,并在这个方法中添加的自定义的handler 的.那么initChannel 是哪里被调用的呢? 答案是ChannelInitializer. channelRegistered方法中.

代码语言:javascript复制
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    if (initChannel(ctx)) {
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.
        ctx.fireChannelRegistered();
    }
}

我们来关注一下channelRegistered方法.从上面的源码中,我们可以看到,在channelRegistered方法中,会调用initChannel方法,将自定义的handler 添加到ChannelPipeline中,然后调用ctx. pipeline(). remove(this)将自己从ChannelPipeline中删除…上面的分析过程,

代码语言:javascript复制
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

这段代码用的模板方法模式

可以用如下图片展示: 一开始,ChannelPipeline 中只有三个handler, head, tail 和我们添加的ChannelInitializer.

接着initChannel方法调用后,添加自定义的handler:

最后将ChannelInitializer删除:

分析到这里,我们已经简单了解了自定义的handler 是如何添加到ChannelPipeline 中的,在后面的我们再进行深入的探讨.

1.1.9客户端连接分析

经过.上面的各种分析后,我们大致了解了Netty初始化时,所做的工作,那么接下来我们就直奔主题,分析一下客户端是如何发起_TCP 连接的。

首先,客户端通过调用Bootstrap 的connect 方法进行连接. 在connect中,会进行一些参数检查后,最终调用的是doConnect 方法,其实现如下:

代码语言:javascript复制
private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

在doConnect 中,会在event loop 线程中调用Channel的connect方法,而这个Channel的具体类型是什么呢?我们在前面已经分析过了,这里channel 的类型就是NioSocketChannel. 进行跟踪到channel. connect中,我们发现它调用的是DefaultChannelPipeline. connect, 而,pipeline的connect 代码如下:

代码语言:javascript复制
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

而tail 字段,我们已经分析过了,是一个TailContext 的实例,而TailContext 又是AbstractChanne lHandlerContext的子类,并且没有实现connect 方法,因此这里调用的其实是AbstractChannelHandlerContext. connect,我们看一下这个方法的实现:

代码语言:javascript复制
@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

上面的代码中有一个关键的地方,即final AbstractChannelHandlerContext next =findContextOutbound(),这里调用findContextOutbound 方法,从DefaultChannelPipeline内的双向链表的tail开始,不断向前寻找第一个outbound 为true 的 AbstractChannelHandlerContext,然后调用它的invokeConnect 方法,其代码如下:

代码语言:javascript复制
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

前面我们提到,在DefaultChannelPipeline时构造器中,会实例化两个对象: head和tail,并形成了双向链表的头和尾. head是 HeadContext的实例,它实现了Channe l0utboundHandler接口,并且它的outbound字段为true. 因此在findContextOutbound中,找到的AbstractChannelHandlerContext 对象其实就是head. 进而在invokeConnect方法中,我们向上转换为Channel0utboundHandler 就是没问题的了.

而又因为HeadContext 重写了connect 方法,因此实际上调用的是HeadContext. connect. 我们接着跟踪到HeadContext. connect,其代码如下:

代码语言:javascript复制
@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

这个connect 方法很简单,仅仅调用了unsale 的connect 方法.而unsafe 又是什么呢? 回顾一下HeadContext 的构造器,我们发现unsafe 是pipeline. channel (). unsafe() 返回的是Channel 的unsafe 字段,在这里,我们已经知道了,其实是AbstractNioByteChannel. NioByteUnsafe-内部类.兜兜转转了一大圈,我们找到了创建Socket连接的关键代码. 进行跟踪NioByteUnsafe -> AbstractNioUnsafe. connect:

AbstractNioUnsafe. connect的实现如上代码所示,在这个connect方法中,调用了doConnect方法,注意,这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel的抽象方法. doConnect 方法是在Ni oSocketChannel中实现的,因此进入到NioSocketChannel. doConnect中:

代码语言:javascript复制
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

我们终于看到的最关键的部分了,庆祝一下! 上面的代码不用多说,首先是获取Java NIO SocketChannel,从NioSocketChannel. newSocket返回的SocketChannel 对象;然后是调用SocketChannel. connect方法完成Java NIO层面上的Socket 的连接. 最后,上面的代码流程可以用如下时序图直观地展示:

1.2.服务端ServerBootStrap

在分析客户端的代码时,我们已经对Bootstrap 启动Netty 有了一个大致的认识,那么接下来分析服务器端时,就会相对简单一些了. 首先还是来看一下服务器端的启动代码:

代码语言:javascript复制
public class ChatServer {

    private static Logger logger = Logger.getLogger(ChatServer.class);

    private int port = 80;

    public static void main(String[] args) {
        new ChatServer().start();
    }

    private void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            //解析自定义协议
                            pipeline.addLast(new IMDecoder());
                            pipeline.addLast(new IMEncoder());
                            pipeline.addLast(new SocketHandler());

                            //解析http请求
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new HttpObjectAggregator(64*1024));
                            pipeline.addLast(new ChunkedWriteHandler());
                            pipeline.addLast(new HttpHandler());

                            //解析websocket请求
                            pipeline.addLast(new WebSocketServerProtocolHandler("/im"));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            logger.info("服务已启动,监听端口" this.port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

和客户端的代码相比,没有很大的差别,基本上也是进行了如下几个部分的初始化: 1、EventLoopGroup:不论是服务器端还是客户端,都必须指定EventLoopGroup. 在这个例子中,指定了NioEventLoopGroup, 表示一个NIO的EventLoopGroup,不过服务器端需要指定两个EventLoopGroup,一个是bossGroup, 用于处理客户端的连接请求;另一个是 workerGroup,用于处理与各个客户端连接的I0操作. 2、ChannelType:指定Channel 的类型.因为是服务器端,因此使用了NioServerSocketChannel. 3、Handler: 设置数据的处理器.

1.2.1 Channel的初始化过程

我们在分析客户端的Channel 初始化过程时,已经提到,Channel 是对Java底层Socket连接的抽象,并且知道了客户端的Channel 的具体类型是NioSocketChannel,那么自然的,服务器端的Channel 类型就是NioServerSocketChannel了 . 那么接下来我们按照分析客户端的流程对服务器端的代码也同样地分析一遍,这样也方便我们对比一下服务器端和客户端有哪些不一样的地方.

1.2.2Channel类型的确定

同样的分析套路,我们已经知道了,在客户端中,Channel 的类型其实是在初始化时,通过Bootstrap. channel()方法设置的,服务器端自然也不例外. 在服务器端,我们调用了ServerBootstarap. channel (Ni oServerSocketChannel. class),传递了一个NioServerSocketChannel Class 对象.这样的话,按照和分析客户端代码一样的流程,我们就可以确定,NioServerSocketChannel的实例化是通过ReflectiveChannelFactory工厂类来完成的,而ReflectiveChannelFactory中的clazz 字段被设置为了NioServerSocketChannel. class,因此当调用 ReflectiveChanne lFactory. newChannel()时:

代码语言:javascript复制
@Override
public T newChannel() {
    try {
        return clazz.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class "   clazz, t);
    }
}

就获取到了一个NioServerSocketChannel的实例. 最后我们也来总结一下: 1、ServerBootstrap 中的ChannelFactory 的实现是ReflectiveChanne lFactory 2、生成的Channel 的具体类型是NioServerSocketChannel. Channel的实例化过程,其实就是调用的Channe lFactory. newChannel方法,而实例化的Channel的具体的类型又是和在初始化ServerBootstrap时传入的channel() 方法的参数相关.因此对于我们这个例子中的服务器端的ServerBootstrap 而言,生成的的Channel 实例就是NioServerSocketChannel.

1.2. 3 NioServerSocketChannel的实例化过程

首先还是来看一下NioServerSocketChannel的实例化过程.下面是NioServerSocketChannel的类层次结构图:

首先,我们来看一下它的默认的构造器.和NioSocketChannel 类似,构造器都是调用了newSocket 来打开一个Java的NI0 Socket, 不过需要注意的是,客户端的newSocket 调用的是openSocketChannel,而服务器端的newSocket 调用的是openServerSocketChannel. 顾名思义,一个是客户端的Java SocketChannel,一个是服务器端的Java ServerSocketChannel.

代码语言:javascript复制
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
代码语言:javascript复制
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

接下来会调用重载的构造器:

代码语言:javascript复制
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这个构造其中,调用父类构造器时,传入的参数是“SelectionKey. 0P_ ACCEPT. 作为对比,我们回想一下,在客户端的Channel 初始化时,传入的参数是SelectionKey. 0P_ READ. 我前面讲过,Java NI0是一种Reactor模式,我们通过selector来实现I/0的多路复用复用.在一开始时,服务器端需要监听客户端的连接请求,因此在这里我们设置了SelectionKey. OP_ ACCEPT, 即通知selector 我们对客户端的连 接请求感兴趣. 接着和客户端的分析一下,会逐级地调用父类的构造器NioServerSocketChannel ->AbstractNi oMessageChannql -> AbstractNioChannel -> AbstractChannel.

同样的,在AbstractChannel 中会实例化一个 unsafe和pipeline:

代码语言:javascript复制
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

不过,这里有一点需要注意的是,客户端的unsafe 是一个AbstractNioByteChannel#NioByteUnsafe 的 实例,而在服务器端时,因为AbstractNioMessageChannel重写了newUnsafe方法:

代码语言:javascript复制
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

因此在服务器端,unsafe 字段其实是一个AbstractNIoMessageChannel. AbstractNioUnsafe的实例. 我们来总结一下,在NioServerSocketChannsl实例化过程中,所需要做的工作: 1、调用NioServerSocketChanne 1. newSocket (DEFAULT_ SELECTOR_ _PROVIDER) 打开一个新的Java NI0ServerSocke tChannel 2、AbstractChannel (Channel parent) 中初始化AbstractChannel 的属性:

parent属性置为null unsafe通过newUnsafe()实例化一个unsafe对象,它的类型是AbstractNioMessageChanne 1 #AbstractNioUnsafe内部类 pipeline是new Defaul tChannelPipeline(this)新创建的实例. 3、AbstractNioChannel 中的属性: Selectabl eChannel ch被设置为Java ServerSocketChannel,即NioServerSocketChanne l #newSocket返回的Java NIO ServerSocketChannel.readInterestOp被设置为SelectionKey. 0P_ ACCEPT SelectableChannel ch被配置为非阻塞的ch. confi gureBlocking (false) 4、Ni oServerSocketChannel中的属性: ServerSocketChannelConfig config=new NioServerSocketChanne lConfig(this,javaChannel (). socket ())

1.2. 4 ChannelPipeline 初始化

服务器端和客户端的ChannelPipeline 的初始化一致,因此就不再单独分析了。

1.2.5 Channel的注册

服务器端和客户端的Channel 的注册过程一致,因此就不再单独分析了.

1.2.6关于bossGroup 与workerGroup

在客户端的时候,我们只提供了一个EventLoopGroup对象,而在服务器端的初始化时,我们设置了两个EventLoopGroup,一个是bossGroup, 另一个是workerGroup. 那么这两个EventLoopGroup 都是干什么用的呢?其实呢,bossGroup是用于服务端的accept 的,即用于处理客户端的连接请求,我们可以把Netty比作一个饭店,bossGroup 就像一个像一个前台接待,当客户来到饭店吃时,接待员就会引导顾客就坐,为顾客端茶送水等.而workerGroup, 其实就是实际上干活的啦,它们负责客户端连接通道的I0操作:当接待员招待好顾客后,就可以稍做休息,而此时后厨里的厨师们(workerGroup)就开始忙碌地准备饭菜了. 关于bossGroup与workerGroup 的关系,我们可以用如下图来展示:

首先,服务器端bossGroup不断地监听是否有客户端的连接,当发现有一个新的客户端连接到来时, bossGroup就会为此连接初始化各项资源,然后从workerGroup中选出一个EventLoop 绑定到此客户端 连接中,那么接下来的服务器与客户端的交互过程就全部在此分配的EventLoop 中了. 口说无凭,我们还是以源码说话吧. 首先在ServerBootstrap初始化时,调用了b. group (bossGroup,workerGroup) 设置了两个 EventLoopGroup,我们跟踪进去看一下

代码语言:javascript复制
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

显然,这个方法初始化了两个字段,一个是group 三parentGroup,它 是在super. group (parentGroup)中初始化的,另一个是childGroup = childGroup.接着我们启动程序调用了b.bind 方法来监听一个本地端口. bind 方法会触发如下的调用链: AbstractBootstrap. bind -> AbstractBootstrap. doBind ->AbstractBootstrap. initAndRegister

源码看到到这里为止,AbstractBootstrap. initAndRegister已经是我们的老朋友了,我们在分析客户端程序时,和它打过很多交到了,现在再来回顾一下这个方法吧:

代码语言:javascript复制
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

这里group()方法返回的是上面我们提到的bossGroup,而这里的channel 我们也已经分析过了,它是一个NioServerSocketChannsl 实例,因此我们可以知道,group(). register (channel)将bossGroup和NioServerSocketChannsl关联起来了. 那么workerGroup是在哪里与NioServerSocketChannel 关联的呢? 我们继续看init (channel)方法:

代码语言:javascript复制
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init方法在ServerBootstrap中重写了,从上面的代码片段中我们看到,它为pipeline 中添加了一个ChannelInitializer,而这个ChannelInitializer 中添加了一个关键的ServerBootstrapAcceptorhandler.关于handler 的添加与初始化的过程,我们留待下一小节中分析,我们现在关注一下ServerBootstrapAcceptor类. Server BootstrapAcceptor中重写了channelRead 方法,其主要代码如下:

代码语言:javascript复制
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: "   e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: "   child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

ServerBootstrapAcceptor中的childGroup 是构造此对象是传入的currentChildGroup, 即我们的workerGroup,而Channel 是一个NioSocketChannel的实例,因此这里的childGroup. register就是将workerGroup 中的某个EventLoop 和Ni oSocketChannel 关联了。既然这样, 那么现在的问题是,ServerBootstrapAcceptor. channelRead方法是怎么被调用的呢?其实当一个client 连接到server 时, Java底层的NIO ServerSocketChannel会有一个SelectionKey. 0P_ ACCEPT 就绪事件,接着就会调用到NioServerSocketChannel.doReadMessages:

代码语言:javascript复制
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

在doReadMessages中,通过javaChannel (). accept(获取到客户端新连接的SocketChannel,接着就实例化一个NioSocketChannel, 并且传入NioServerSocketChannel对象(即this), 由此可知,我们创建的这个NioSocketChannel的父Channel 就是NioServerSocketChannel实例. 接下来就经由Netty 的ChannelPipeline机制,将读取事件逐级发送到各个handler 中,于是就会触发前面我们提到的ServerBoot strapAcceptor. channelRead方法啦.

1.2.6 Handler 的添加过程

服务器端的handler的添加过程和客户端的有点区别,和EventLoopGroup一样,服务器端的handler也有两个,一个是通过handler() 方法设置handler 字段,另一个是通过chi ldHandler()设置childHandler字段.通过前面的bossGroup 和workerGroup 的分析,其实我们在这里可以大胆地猜测: handler字段与accept 过程有关,即这个handler 负责处理客户端的连接请求;而childHandler 就是负责和客户端的连接的I0交互. 那么实际上是不是这样的呢?来,我们继续通过代码证明. 在关于bossGroup与workerGroup 小节中,我们提到,ServerBootstrap 重写了init 方法,在这个方法中添加了handler:

上面代码的initChannel 方法中,首先通过handler() 方法获取一个handler, 如果获取的handler不为空,则添加到pipeline 中.然后接着,添加了一个ServerBootstrapAcceptor 实例.那么这里handler()方法返回的是哪个对象呢?其实它返回的是handler 字段,而这个字段就是我们在服务器端的启动代码中设置的: b. group(bossGroup, workerGroup ) 那么这个时候,pipeline 中的handler 情况如下:

根据我们原来分析客户端的经验,我们指定,当channel 绑定到eventLoop 后(在这里是NioServerSocketChannel绑定到bossGroup) 中时,会在pipeline 中发出fireChannelRegistered事件,接着就会触发ChannelInitializer. initChannel方法的调用. 因此在绑定完成后,此时的pipeline 的内如下:

前面我们在分析bossSGroup和workerGroup时,.已经知道了在ServerBootstrapAcceptor. channelRead中会为新建的Channel 设置handler 并注册到一个eventLoop中

后续的步骤就没有什么好说的了,当这木客户端连接Channel注册后,就会触发ChannelInitial izer. ini tChannel方法的调用。

最后我们来总结一下服务器端的handler 与childHandler 的区别与联系: 1、在服务器NioServerSocketChannel 的pipeline 中添加的是handler 与ServerBootstrapAcceptor. 2、当有新的客户端连接请求时,ServerBootstrapAcceptor. channelRead 中负责新建此连接的NioSocketChannel 并添加chi ldHandler到Ni oSocketChannel对应的pipeline 中,并将此channel绑定到workerGroup 中的某个eventLoop 中. 3、handler 是在accept 阶段起作用,它处理客户端的连接请求. 4、childHandler 是在客户端连接建立以后起作用,它负责客户端连接的I0交互.

0 人点赞