源码分析 Netty:核心组件及启动过程分析

2021-03-23 10:04:25 浏览数 (1)

一 Netty核心组件

1.1 Channel

Channel(通道)是 NIO 基本的结构。JDK的NIO包中,有Channel接口的介绍:

A nexus for I/O operations.

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.

Channels are, in general, intended to be safe for multithreaded access as described in the specifications of the interfaces and classes that extend and implement this interface.

根据上述文档的描述,Channel是I/O操作的连接(关系),表示与能够执行一个或多个不同I/O操作(例如读取或写入)的实体(例如硬件设备、文件、网络套接字或程序组件)的开放连接。

通道是“打开”或“关闭”的。通道在创建时是开放的,一旦关闭它就会保持关闭。一旦通道关闭,对其调用I/O操作的任何尝试都将导致引发ClosedChannelException。通道是否打开可以通过调用其isOpen方法进行测试;通道通常是为了安全地进行多线程访问,如扩展和实现该接口的接口和类的规范所述。

1.2 Callback

回调。Callback已经是一种非常常见的异步实现方法,用于通知调用方操作已完成。可以简单理解为一个方法,提供给另一种方法作为引用,这样后者就可以在某个合适的时间调用前者。

1.3 Future

Future 提供了另外一种通知应用操作已经完成的方式。这个对象作为一个异步操作结果的占位符,它将在将来的某个时候完成并提供结果。

1.3.1 JDK的Future接口

JDK提供了java.util.concurrent.Future接口,Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

代码语言:javascript复制
Future代表异步计算的结果,接口提供方法用于检查计算是否已完成,等待计算完成,然后取回计算结果。计算结果只能通过get方法返回;如果有必要会堵塞直到它计算完成。可以通过cancel方法取消。附加的方法可用于判断任务是否正常完成或者被取消。一旦计算完成,那么它不能被取消。如果你想要使用Future 来取消,但是不提供一个可用的结果,你可以声明Futrue 的类型,但会返回null 作为一个基本任务的结果。FutureTask 类是Futrue类的一个实现类,实现了Runnable接口,可以被Executor 执行。

1.3.2 Future提供的方法

1.3.2.1 cancel

boolean cancel(boolean mayInterruptIfRunning) 调用该方法将试图取消对任务的执行。如果任务已经完成了、已取消、无法取消这种尝试会失败。当该方法调用时任务还没有开始,方法调用成功而且任务将不会再执行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否执行此任务的线程应该以试图停止任务被中断。此方法返回后调用isDone 方法将返回 true 。后续调用 isCancelled 总是返回第一次调用的返回值。

1.3.2.2 isCancelled

boolean isCancelled() 如果任务在完成前被取消,将返回 true。

请注意任务取消是一种主动的行为。

1.3.2.3 isDone

boolean isDone() 任务已经结束,在任务完成、任务取消、任务异常的情况下都返回 true 。

1.3.2.4 get

V get() throws InterruptedException, ExecutionException 调用此方法会在获取计算结果前等待。一但计算完毕将立刻返回结果。它还有一个重载方法 V get(long timeout, TimeUnit unit) 在单位时间内没有返回任务计算结果将超时,任务将立即结束。

1.3.2 ChannelFuture

JDK的Future提供了一个异步获取执行结果的机制,但提供的方法有限,所以Netty自定义了Future接口。另外,Netty 中所有的 I/O 操作都是异步的,因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。

ChannelFuture提供多个方法来允许一个或者多个 ChannelFutureListener 实例。这个回调方法 operationComplete() 会在操作完成时调用。事件监听者能够确认这个操作是否成功或者是错误。如果是后者,我们可以检索到产生的 Throwable。简而言之, ChannelFutureListener 提供的通知机制不需要手动检查操作是否完成的。

每个 Netty 的 outbound I/O 操作都会返回一个 ChannelFuture;这样就不会阻塞。这就是 Netty 所谓的“自底向上的异步和事件驱动”。ChannelFuture接口代码如下:

代码语言:javascript复制
public interface ChannelFuture extends Future<Void> {
    Channel channel();

    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);

    ChannelFuture addListeners(GenericFutureListener... var1);

    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);

    ChannelFuture removeListeners(GenericFutureListener... var1);

    ChannelFuture sync() throws InterruptedException;

    ChannelFuture syncUninterruptibly();

    ChannelFuture await() throws InterruptedException;

    ChannelFuture awaitUninterruptibly();

    boolean isVoid();
}

1.4 Event与Handler

事件机制的重要组成部分,通过不同的事件来通知我们更改的状态或操作的状态。这使我们能够根据发生的事件触发适当的“行为”。行为可能包括日志记录、数据转换、流控制、应用程序逻辑等。

Netty是一个网络框架,事件很清晰的跟入站(inbound)或出站(outbound)的数据流相关。因为一些事件可能触发传入的数据或状态的变化包括:

  • 活动或非活动连接
  • 数据的读取
  • 用户事件
  • 错误

出站事件是由于在未来操作将触发一个动作。这些包括:

  • 打开或关闭一个连接到远程
  • 写或冲刷数据到 socket

每个事件都可以分配给用户实现处理程序类的方法。这说明了事件驱动的范例可直接转换为应用程序构建块。

上图是Netty的事流图(图片来自w3c School)。

Netty 的 ChannelHandler 是各种处理程序的基本抽象。想象下,每个处理器实例就是一个回调,用于执行对各种事件的响应。

在此基础之上,Netty 也提供了一组丰富的预定义的处理程序方便开箱即用。比如,各种协议的编解码器包括 HTTP 和 SSL/TLS。在内部,ChannelHandler 使用事件和 future 本身,创建具有 Netty 特性抽象的消费者。

1.5 Eventloop

EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。下图说明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系。

1.6 Netty异步模型

1.6.1 Future和Callback

Netty 的异步编程模型是建立在 Future 和 Callback 的概念上的。

拦截操作和转换入站/出站数据,只需要开发者提供回调或利用 Future 操作返回。这使得链操作简单、高效,促进编写可重用的、通用的代码。Netty 的一个主要设计目标是促进“关注点分离”,即把业务逻辑从网络基础设施程序中分离出来。

1.6.2 Selector, Event 和 Eventloop

Netty 通过触发事件从应用程序中抽象出 Selector,从而避免手写调度代码。EventLoop 分配给每个 Channel 来处理所有的事件,包括

  • 注册感兴趣的事件
  • 调度事件到 ChannelHandler
  • 安排进一步行动

EventLoop 本身是由一个线程驱动,它给一个 Channel 处理所有的 I/O 事件,并且在 EventLoop 的生命周期内不会改变(参考上篇介绍的串行化设计概念)。这个简单而强大的线程模型消除可能对 ChannelHandler同步是否正确和合理性的关注,这样就可以专注于提供正确的回调逻辑。

二 Netty启动过程分析

2.1 一个Netty Server示例

代码语言:javascript复制
public class HttpServer {

	public static int DEFAULT_PORT = 8080;

	public static void main(String[] args) throws Exception {
		int port;

		try {
			port = Integer.parseInt(args[0]);
		} catch (RuntimeException ex) {
			port = DEFAULT_PORT;
		}

		// 多线程事件循环器
		EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
		EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
		
		try {
			ServerBootstrap b = new ServerBootstrap(); // 引导程序
			b.group(bossGroup, workerGroup) // 设置EventLoopGroup
			.channel(NioServerSocketChannel.class) // 指明新的Channel的类型
			.childHandler(new HttpServerChannelInitializer()) // 指定ChannelHandler
			.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
			.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
 
			// 绑定端口,开始接收进来的连接
			ChannelFuture f = b.bind(port).sync(); 

			System.out.println("HttpServer已启动,端口:"   port);

			// 等待服务器 socket 关闭 。
			// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
			f.channel().closeFuture().sync();
		} finally {
			// 优雅关闭
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}

	}
}

这里还有一个依赖,HttpServerChannelInitializer,代码如下:

代码语言:javascript复制
public class HttpServerChannelInitializer extends ChannelInitializer<SocketChannel> {

	public HttpServerChannelInitializer() {
	}
	
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ch.pipeline().addLast("codec", new HttpServerCodec());
		ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
		ch.pipeline().addLast("serverHandler", new HttpServerHandler());
	}

}

HttpServerHandler:

代码语言:javascript复制
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
		this.readRequest(msg);

		String sendMsg;
		String uri = msg.uri();

		switch (uri) {
		case "/":
			sendMsg = "<h3>Netty HTTP Server</h3><p>Welcome to <a href="https://waylau.com">waylau.com</a>!</p>";
			break;
		case "/hi":
			sendMsg = "<h3>Netty HTTP Server</h3><p>Hello Word!</p>";
			break;
		case "/love":
			sendMsg = "<h3>Netty HTTP Server</h3><p>I Love You!</p>";
			break;
		default:
			sendMsg = "<h3>Netty HTTP Server</h3><p>I was lost!</p>";
			break;
		}

		this.writeResponse(ctx, sendMsg);
	}

	private void readRequest(FullHttpRequest msg) {
    //请求行信息
		System.out.println(msg.method()   " "   msg.uri()   " "   msg.protocolVersion());

		//请求头信息
		for (String name : msg.headers().names()) {
			System.out.println(name   ": "   msg.headers().get(name));

		}

		//消息体
		System.out.println(msg.content().toString(CharsetUtil.UTF_8));

	}

	private void writeResponse(ChannelHandlerContext ctx, String msg) {
		ByteBuf bf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);

		FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, bf);
		res.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.length());
		ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
	}

我们可以直接执行这段代码,启动一个HTTP服务,端口作为参数输入,或者直接使用默认的8080端口。

2.2 配置分析

2.2.1 引导(BootStrap)

引导一个应用程序是指对它进行配置,并使它运行起来的过程——尽管该过程的具体细节可能并不如它的定义那样简单,尤其是对于一个网络应用程序来说。上面示例中的ServerBootstrap就是一个“引导”类,可以看出,ServerBootstrap是负责串联章节一中介绍过的Channel、EventLoopGroup、ChannelHandler、ChannelFuture 以及 参数配置等组件的。

引导类的层次结构:

2.2.2 引导过程

步骤如下:

1、设置 EventLoopGroup:提供了用于处理 Channel 事件的 EventLoop,在示例中我们定义了两个EventLoopGroup,boss和work;

2、channel(NioServerSocketChannel.class):指定要使用的Channel实现;

3、childHandler(new HttpServerChannelInitializer()):设置用于处理已被接受的子 Channel 的 I/O 及数

据的 ChannelInbound-Handler;

4、b.bind(port):通过配置好的ServerBootstrap的实例绑定该Channel

通过图片看引导过程如下:

2.2.3 ChannelInitializer

在Server启动时,在childHandler()方法中设置了一个自定义的HttpServerChannelInitializer()。在代码中,可以看到在SockerChannel中设置了以下三个内容:

代码语言:javascript复制
ch.pipeline().addLast("codec", new HttpServerCodec());ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));ch.pipeline().addLast("serverHandler", new HttpServerHandler());
2.2.3.1 codec

编解码的处理,HttpServerCodec是netty针对http编解码的处理类,但是这些只能处理像http get的请求,也就是数据带在url问号后面的http请求;

2.2.3.2 aggregator

用POST方式请求服务器的时候,对应的参数信息是保存在message body中的,如果只是单纯的用HttpServerCodec是无法完全的解析Http POST请求的,因为HttpServerCodec只能获取uri中参数,所以需要加上HttpObjectAggregator。它把HttpMessage和HttpContent聚合成为一个FullHttpRquest或者FullHttpRsponse,大致结构如下图所示:

2.2.3.3 serverHandler

示例使用了自定义的HttpServerHandler,在channelRead0方法中针对请求的uri进行处理,在生成响应(writeResponse)时使用对应的信息。

三 总结

本篇从实例出发,了解Netty核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握Netty的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。

0 人点赞