为什么使用 Netty
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能和高伸缩性的服务器和客户端。Netty 拥有高性能,吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
Netty 和 NIO
NIO 的缺点
- NIO 的类库和 API 繁杂,学习成本高,你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要熟悉 Java 多线程编程。这是因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的 NIO 程序。
- 臭名昭著的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然没得到根本性的解决。
Netty 的优点
- Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,API使用简单,学习成本低。
- 功能强大,内置了多种解码编码器,支持多种协议。
- 性能高,对比其他主流的 NIO 框架,Netty 的性能最优。
- 社区活跃,发现 BUG 会及时修复,迭代版本周期短,不断加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,质量得到验证。
Netty线程模型
模型解释
- 1.Netty 抽象出两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写。
- 2.BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。NioEventLoopGroup 相当于是一个事件循环线程组,这个组内含有多个事件循环线程,每一个事件循环线程都是 NioEventLoop。
- 3.每个 NioEventLoop 都有一个 Selector,用于监听注册在其上的 SocketChannel 的网络通讯。
- 4.每个 Boss NioEventLoop 线程内部循环执行的3个步骤:
- 处理 Accept 事件,与客户端建立连接,生成 NioSocketChannel。
- 将 NioSocketChannel 注册到某个 Worker NioEventLoop 上的 Selector。
- 处理任务队列的任务,即 runAllTask。
- 5.每个 Worker NioEventLoop 线程内部循环执行的3个步骤:
- 轮询注册到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
- 处理 I/O 事件,即 read,write 事件,在对应的 NioSocketChannel 处理业务。
- runAllTasks 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中慢慢处理,这样不影响数据在 Pipeline 中的流动处理。
- 6.每个 Worker NioEventLoop 处理 NioSocketChannel 业务时,会使用 Pipeline(管道),Pipeline 中维护了很多的 handler 处理器用来处理 NioSocketChannel 中的数据。
Netty 客户端 & 服务器开发
创建并配置服务器启动器
Bootstrap、ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类。
可以看出都是继承于AbstractBootStrap抽象类,所以大致上的配置方法都相同。
一般来说,使用Bootstrap创建启动器的步骤可分为以下几步:
group()
服务端要使用两个线程组:
- bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到 workerGroup 的 Selector 中。
- workerGroup 用于处理每一个连接发生的读写事件。
一般创建线程组直接使用以下new就完事了:
代码语言:javascript复制EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:
代码语言:javascript复制//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
channel()
这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的 Channel 实例。
常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选:
- NioSocketChannel:异步非阻塞的客户端 TCP Socket 连接。
- NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连接。
还有就是同步阻塞的,一般没什么人用:
- OioSocketChannel:同步阻塞的客户端 TCP Socket 连接。
- OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连接。
option() 与 childOption()
option() 设置的是服务端用于接收进来的连接,也就是 boosGroup 线程。option() 常用参数:
代码语言:javascript复制SO_BACKLOG //Socket 参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows 为200,其他为128。
childOption() 是提供给父管道接收到的连接,也就是 workerGroup 线程。childOption() 常用的参数:
代码语言:javascript复制SO_RCVBUF //Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY //TCP 参数,立即发送数据,默认值为 True。
SO_KEEPALIVE //Socket 参数,连接保活,默认值为 False。启用该功能时,TCP 会主动探测空闲连接的有效性。
pipeline(ChannelPipeline)
ChannelPipeline 是 Netty 处理请求的责任链,ChannelHandler 则是具体处理请求的处理器。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
处理器 Handler 分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)。
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler,通过 ChannelHandlerContext 上下文对象,就可以拿到 Channel、Pipeline 等对象,就可以进行读写等操作。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler。两种类型的 Handler 互不干扰,相同类型的 Handler 的处理顺序是有影响的。
在 Bootstrap 中 childHandler() 方法需要初始化通道,实例化一个 ChannelInitializer,这时候需要重写 initChannel() 初始化通道的方法,装配流水线就是在这个地方进行。代码如下:
代码语言:javascript复制//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
自定义 NettyServerHandler 代码如下:
代码语言:javascript复制package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道 channel,管道 pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " Thread.currentThread().getName());
//将 msg 转成一个 ByteBuf,类似 NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息是:" buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕后的处理方法
*
* @param ctx 上下文对象, 含有通道 channel,管道 pipeline
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
bind()
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动,如果加上 sync() 方法则是同步。
代码语言:javascript复制 // sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 异步
// 给cf注册监听器,监听我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});
优雅地关闭 EventLoopGroup
代码语言:javascript复制//释放掉所有的资源,包括创建的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
服务端启动器完整代码
代码语言:javascript复制package com.chengzw.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty 服务端
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 bossGroup 和 workerGroup, 含有的子线程 NioEventLoop 的个数默认为cpu核数的两倍
// bossGroup 只是处理连接请求 ,真正的和客户端业务处理,会交给 workerGroup 完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1个线程
EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8个线程
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用 NioServerSocketChannel 作为服务器的通道实现,该类用于实例化新的 Channel 来接收客户端的连接
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start ...");
// 绑定一个端口并且同步, 生成了一个 ChannelFuture 异步对象,通过 isDone() 等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind 是异步操作,sync 方法是等待异步操作执行完毕
// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 异步
// 给cf注册监听器,监听我们关心的事件
/*channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
// 在这里面cf.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程,
// 即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。
// 原文的例子有英文注释:
// Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
// 线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver,
channelFuture.channel().closeFuture().sync();
} finally {
// 资源优雅释放
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
创建并配置客户端启动器
客户端只需要一个 NioEventLoopGroup,其余代码和服务器类似。首先自定义 NettyClientHandler 用于处理客户端 ChannelPipeline 的业务。
代码语言:javascript复制package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
* @author 程治玮
* @since 2021/3/25 9:50 下午
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 当通道有读取事件时会触发,即服务端发送数据给客户端
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址: " ctx.channel().remoteAddress());
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
然后配置客户端启动器:
代码语言:javascript复制package com.chengzw.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty 客户端
* @author 程治玮
* @since 2021/3/25 9:52 下午
*/
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start..");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
参考链接
- https://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-5
- https://zhuanlan.zhihu.com/p/181239748