高清思维导图原件(xmind/pdf/jpg
)可以关注公众号:一枝花算不算浪漫
回复netty01
即可。
前言
上一篇文章讲了NIO
相关的知识点,相比于传统IO
,NIO
已经做得很优雅了,为什么我们还要使用Netty
?
上篇文章最后留了很多坑,讲了NIO
使用的弊端,也是为了引出Netty
而设立的,这篇文章我们就来好好揭开Netty
的神秘面纱。
本篇文章的目的很简单,希望看过后你能看懂Netty
的示例代码,针对于简单的网络通信,自己也能用Netty
手写一个开发应用出来!
一个简单的Netty示例
以下是一个简单聊天室Server端的程序,代码参考自:http://www.imooc.com/read/82/article/2166
代码有点长,主要核心代码是在main()
方法中,这里代码也希望大家看懂,后面也会一步步剖析。
PS:我是用mac
系统,直接在终端输入telnet 127.0.0.1 8007
即可启动一个聊天框,如果提示找不到telnet
命令,可以通过brew
进行安装,具体步骤请自行百度。
/**
* @Description netty简易聊天室
*
* @Author 一枝花算不算浪漫
* @Date 2020/8/10 6:52 上午
*/
public final class NettyChatServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 1. EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2. 服务端引导器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3. 设置线bootStrap信息
serverBootstrap.group(bossGroup, workerGroup)
// 4. 设置ServerSocketChannel的类型
.channel(NioServerSocketChannel.class)
// 5. 设置参数
.option(ChannelOption.SO_BACKLOG, 100)
// 6. 设置ServerSocketChannel对应的Handler,只能设置一个
.handler(new LoggingHandler(LogLevel.INFO))
// 7. 设置SocketChannel对应的Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 可以添加多个子Handler
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
// 8. 绑定端口
ChannelFuture f = serverBootstrap.bind(PORT).sync();
// 9. 等待服务端监听端口关闭,这里会阻塞主线程
f.channel().closeFuture().sync();
} finally {
// 10. 优雅地关闭两个线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("one conn active: " ctx.channel());
// channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的
ChatHolder.join((SocketChannel) ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
System.out.println(content);
if (content.equals("quitrn")) {
ctx.channel().close();
} else {
ChatHolder.propagate((SocketChannel) ctx.channel(), content);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("one conn inactive: " ctx.channel());
ChatHolder.quit((SocketChannel) ctx.channel());
}
}
private static class ChatHolder {
static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>();
/**
* 加入群聊
*/
static void join(SocketChannel socketChannel) {
// 有人加入就给他分配一个id
String userId = "用户" ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
send(socketChannel, "您的id为:" userId "nr");
for (SocketChannel channel : USER_MAP.keySet()) {
send(channel, userId " 加入了群聊" "nr");
}
// 将当前用户加入到map中
USER_MAP.put(socketChannel, userId);
}
/**
* 退出群聊
*/
static void quit(SocketChannel socketChannel) {
String userId = USER_MAP.get(socketChannel);
send(socketChannel, "您退出了群聊" "nr");
USER_MAP.remove(socketChannel);
for (SocketChannel channel : USER_MAP.keySet()) {
if (channel != socketChannel) {
send(channel, userId " 退出了群聊" "nr");
}
}
}
/**
* 扩散说话的内容
*/
public static void propagate(SocketChannel socketChannel, String content) {
String userId = USER_MAP.get(socketChannel);
for (SocketChannel channel : USER_MAP.keySet()) {
if (channel != socketChannel) {
send(channel, userId ": " content);
}
}
}
/**
* 发送消息
*/
static void send(SocketChannel socketChannel, String msg) {
try {
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length);
writeBuffer.writeCharSequence(msg, Charset.defaultCharset());
socketChannel.writeAndFlush(writeBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
代码有点长,执行完的效果如上图所示,下面所有内容都是围绕着如何看懂
以及如何写出
这样的代码来展开的,希望你看完 也能轻松手写Netty
服务端代码~。通过简单demo开发让大家体验了Netty
实现相比NIO
确实要简单的多,但优点不限于此,只需要知道选择Netty就对了。
Netty核心组件
对应着文章开头的思维导图,我们知道Netty
的核心组件主要有:
- Bootstrap && ServerBootstrap
- EventLoopGroup
- EventLoop
- ByteBuf
- Channel
- ChannelHandler
- ChannelFuture
- ChannelPipeline
- ChannelHandlerContext
类图如下:
Bootstrap & ServerBootstrap
一看到BootStrap
大家就应该想到启动类、引导类这样的词汇,之前分析过EurekaServer项目启动类时介绍过EurekaBootstrap
, 他的作用就是上下文初始化、配置初始化。
在Netty
中我们也有类似的类,Bootstrap
和ServerBootstrap
它们都是Netty
程序的引导类,主要用于配置各种参数,并启动整个Netty
服务,我们看下文章开头的示例代码:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
Bootstrap
和ServerBootstrap
是针对于Client
和Server
端定义的两套启动类,区别如下:
Bootstrap
是客户端引导类,而ServerBootstrap
是服务端引导类。Bootstrap
通常使用connect()
方法连接到远程的主机和端口,作为一个TCP客户端
。ServerBootstrap
通常使用bind()
方法绑定本地的端口,等待客户端来连接。ServerBootstrap
可以处理Accept
事件,这里面childHandler
是用来处理Channel
请求的,我们可以查看chaildHandler()
方法的注解:
Bootstrap
客户端引导只需要一个EventLoopGroup
,但是一个ServerBootstrap
通常需要两个(上面的boosGroup
和workerGroup
)。
EventLoopGroup && EventLoop
EventLoopGroup
及EventLoop
这两个类名称定义的很奇怪,对于初学者来说往往无法通过名称来了解其中的含义,包括我也是这样。
EventLoopGroup
可以理解为一个线程池,对于服务端程序,我们一般会绑定两个线程池,一个用于处理 Accept
事件,一个用于处理读写事件,看下EventLoop
系列的类目录:
通过上面的类图,我们才恍然大悟,我的亲娘咧,这不就是一个线程池嘛?(名字气的犄角拐弯的真是难认)
EventLoopGroup
是EventLoop
的集合,一个EventLoopGroup
包含一个或者多个EventLoop
。我们可以将EventLoop
看做EventLoopGroup
线程池中的一个个工作线程。
至于这里为什么要用到两个线程池,具体的其实可以参考Reactor
设计模式,这里暂时不做过多的讲解。
- 一个 EventLoopGroup 包含一个或多个 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n
- 一个 EventLoop 在它的生命周期内,只能与一个 Thread 绑定,即 EventLoop : Thread = 1 : 1
- 所有有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,从而保证线程安全,即 Thread : EventLoop = 1 : 1
- 一个 Channel 在它的生命周期内只能注册到一个 EventLoop 上,即 Channel : EventLoop = n : 1
- 一个 EventLoop 可被分配至一个或多个 Channel ,即 EventLoop : Channel = 1 : n
当一个连接到达时,Netty
就会创建一个 Channel
,然后从 EventLoopGroup
中分配一个 EventLoop
来给这个 Channel
绑定上,在该 Channel
的整个生命周期中都是有这个绑定的 EventLoop
来服务的。
ByteBuf
在Java NIO
中我们有 ByteBuffer
缓冲池,对于它的操作我们应该印象深刻,往Buffer
中写数据时我们需要关注写入的位置,切换成读模式时我们还要切换读写状态,不然将会出现大问题。
针对于NIO
中超级难用的Buffer
类, Netty
提供了ByteBuf
来替代。ByteBuf
声明了两个指针:一个读指针,一个写指针,使得读写操作进行分离,简化buffer
的操作流程。
另外Netty
提供了发几种ByteBuf
的实现以供我们选择,ByteBuf
可以分为:
Pooled
和Unpooled
池化和非池化- Heap 和 Direct,堆内存和堆外内存,NIO中创建Buffer也可以指定
- Safe 和 Unsafe,安全和非安全
对于这么多种创建Buffer
的方式该怎么选择呢?Netty
也为我们处理好了,我们可以直接使用(真是暖男Ntetty
):
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf buffer = allocator.buffer(length);
使用这种方式,Netty将最大努力的使用池化、Unsafe、对外内存的方式为我们创建buffer。
Channel
提起Channel
并不陌生,上一篇讲NIO
的三大组件提到过,最常见的就是java.nio.SocketChannel
和java.nio.ServerSocketChannel
,他们用于非阻塞的I/0操作。类似于NIO
的Channel
,Netty提供了自己的Channel
和其子类实现,用于异步I/0操作和其他相关的操作。
在 Netty
中, Channel
是一个 Socket
连接的抽象, 它为用户提供了关于底层 Socket
状态(是否是连接还是断开) 以及对 Socket
的读写等操作。每当 Netty
建立了一个连接后, 都会有一个对应的 Channel
实例。并且,有父子channel
的概念。 服务器连接监听的channel
,也叫 parent channel
。 对应于每一个 Socket
连接的channel
,也叫 child channel
。
既然channel
是 Netty 抽象出来的网络 I/O 读写相关的接口,为什么不使用 JDK NIO
原生的 Channel
而要另起炉灶呢,主要原因如下:
JDK
的SocketChannel
和ServersocketChannel
没有统一的Channel
接口供业务开发者使用,对一于用户而言,没有统一的操作视图,使用起来并不方便。JDK
的SocketChannel
和ScrversockctChannel
的主要职责就是网络 I/O 操作,由于他们是SPI
类接口,由具体的虚拟机厂家来提供,所以通过继承 SPI 功能直接实现ServersocketChannel
和SocketChannel
来扩展其工作量和重新Channel
功类是差不多的。- Netty 的
ChannelPipeline Channel
需要够跟 Netty 的整体架构融合在一起,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些JDK SocketChannel
和ServersocketChannel
都没有提供,需要重新封装。 - 自定义的
Channel
,功实现更加灵活。
基于上述 4 原因,它的设计原理比较简单, Netty 重新设计了 Channel
接口,并且给予了很多不同的实现。但是功能却比较繁杂,主要的设计理念如下:
- 在
Channel
接口层,相关联的其他操作封装起来,采用Facade
模式进行统一封装,将网络 I/O 操作、网络 I/O 统一对外提供。 Channel
接口的定义尽量大而全,统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。- 具体实现采用聚合而非包含的方式,将相关的功类聚合在
Channel
中,由Channel
统一负责分配和调度,功能实现更加灵活。
Channel
的实现类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个 NioServerSocketChannel
和 NioSocketChannel
。
服务端 NioServerSocketChannel
的继承关系类图如下:
客户端 NioSocketChannel
的继承关系类图如下:
后面文章源码系列会具体分析,这里就不进一步阐述分析了。
ChannelHandler
ChannelHandler
是Netty
中最常用的组件。ChannelHandler
主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler
有两个核心子类 ChannelInboundHandler
和 ChannelOutboundHandler
,其中 ChannelInboundHandler
用于接收、处理入站( Inbound
)的数据和事件,而 ChannelOutboundHandler
则相反,用于接收、处理出站( Outbound
)的数据和事件。
ChannelInboundHandler
ChannelInboundHandler
处理入站数据以及各种状态变化,当Channel
状态发生改变会调用ChannelInboundHandler
中的一些生命周期方法.这些方法与Channel
的生命密切相关。
入站数据,就是进入socket
的数据。下面展示一些该接口的生命周期API
:
当某个 ChannelInboundHandler
的实现重写 channelRead()
方法时,它将负责显式地释放与池化的 ByteBuf
实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()
。
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler
,重写channelRead0()
方法,就可以在调用过程中会自动释放资源.
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// 不用调用ReferenceCountUtil.release(msg)也会释放资源
}
}
ChannelOutboundHandler
出站操作和数据将由 ChannelOutboundHandler
处理。它的方法将被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
调用。
ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如, 如果到远程节点的写入被暂停了, 那么你可以推迟冲刷操作并在稍后继续。
ChannelPromise
与ChannelFuture
: ChannelOutboundHandler
中的大部分方法都需要一个ChannelPromise
参数, 以便在操作完成时得到通知。 ChannelPromise
是ChannelFuture
的一个子类,其定义了一些可写的方法,如setSuccess()
和setFailure()
,从而使ChannelFuture
不可变。
ChannelHandlerAdapter
ChannelHandlerAdapter
顾名思义,就是handler
的适配器。你需要知道什么是适配器模式,假设有一个A接口,我们需要A的subclass
实现功能,但是B类中正好有我们需要的功能,不想复制粘贴B中的方法和属性了,那么可以写一个适配器类Adpter
继承B实现A,这样一来Adapter
是A的子类并且能直接使用B中的方法,这种模式就是适配器模式。
就比如Netty中的SslHandler
类,想使用ByteToMessageDecoder
中的方法进行解码,但是必须是ChannelHandler
子类对象才能加入到ChannelPipeline
中,通过如下签名和其实现细节(SslHandler
实现细节就不贴了)就能够作为一个handler
去处理消息了。
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
ChannelHandlerAdapter
提供了一些实用方法isSharable()
如果其对应的实现被标注为 Sharable
, 那么这个方法将返回 true
, 表示它可以被添加到多个 ChannelPipeline
中 。如果想在自己的ChannelHandler
中使用这些适配器类,只需要扩展他们,重写那些想要自定义的方法即可。
ChannelPipeline
每一个新创建的 Channel
都将会被分配一个新的 ChannelPipeline
。这项关联是永久性的; Channel
既不能附加另外一个 ChannelPipeline
,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
Netty 的 ChannelHandler
为处理器提供了基本的抽象, 目前你可以认为每个 ChannelHandler
的实例都类似于一种为了响应特定事件而被执行的回调。从应用程序开发人员的角度来看, 它充当了所有处理入站和出站数据的应用程序逻辑的拦截载体。ChannelPipeline
提供了 ChannelHandler
链的容器,并定义了用于在该链上传播入站和出站事件流的 API
。当 Channel
被创建时,它会被自动地分配到它专属的 ChannelPipeline
。
ChannelHandler
安装到 ChannelPipeline
中的过程如下所示:
- 一个
ChannelInitializer
的实现被注册到了ServerBootstrap
中 - 当
ChannelInitializer.initChannel()
方法被调用时,ChannelInitializer
将在ChannelPipeline
中安装一组自定义的ChannelHandler
ChannelInitializer
将它自己从ChannelPipeline
中移除
如上图所示:这是一个同时具有入站和出站 ChannelHandler
的 ChannelPipeline
的布局,并且印证了我们之前的关于 ChannelPipeline
主要由一系列的 ChannelHandler
所组成的说法。 ChannelPipeline
还提供了通过 ChannelPipeline
本身传播事件的方法。如果一个入站事件被触发,它将被从 ChannelPipeline
的头部开始一直被传播到 Channel Pipeline 的尾端。
你可能会说, 从事件途经 ChannelPipeline
的角度来看, ChannelPipeline
的头部和尾端取决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline
的入站口(图 的左侧)作为头部,而将出站口(该图的右侧)作为尾端。
当你完成了通过调用 ChannelPipeline.add*()
方法将入站处理器( ChannelInboundHandler
)和 出 站 处 理 器 ( ChannelOutboundHandler
) 混 合 添 加 到 ChannelPipeline
之 后 , 每 一 个ChannelHandler
从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图 6-3 中的处理器( ChannelHandler
)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler
将是1,而第一个被出站事件看到的 ChannelHandler
将是 5。
在 ChannelPipeline
传播事件时,它会测试 ChannelPipeline
中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配, ChannelPipeline
将跳过该ChannelHandler
并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。 (当然, ChannelHandler
也可以同时实现ChannelInboundHandler
接口和 ChannelOutboundHandler
接口。)
修改ChannelPipeline
修改指的是添加或删除ChannelHandler
,见代码示例:
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一个Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 这个Handler放在了first,意味着放在了handler1之前
pipeline.addFirst("handler2", new SecondHandler());
// 这个Handler被放到了last,意味着在handler1之后
pipeline.addLast("handler3", new ThirdHandler());
...
// 通过名称删除
pipeline.remove("handler3");
// 通过对象删除
pipeline.remove(firstHandler);
// 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例
pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline
的出入站API
入站API
所示:
图片上传失败...(image-6037f5-1598167949595)
出站API
所示:
ChannelPipeline
这个组件上面所讲的大致只需要记住这三点即可:
ChannelPipeline
保存了与Channel
相关联的ChannelHandler
ChannelPipeline
可以根据需要,通过添加或者删除ChannelHandler
来动态地修改ChannelPipeline
有着丰富的API
用以被调用,以响应入站和出站事件
ChannelHandlerContext
当 ChannelHandler
被添加到 ChannelPipeline
时,它将会被分配一个 ChannelHandlerContext
,它代表了 ChannelHandler
和 ChannelPipeline
之间的绑定。ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个 ChannelPipeline
中的其他ChannelHandler
之间的交互。
如果调用Channel
或ChannelPipeline
上的方法,会沿着整个ChannelPipeline
传播,如果调用ChannelHandlerContext
上的相同方法,则会从对应的当前ChannelHandler
进行传播。
ChannelHandlerContext API
如下表所示:
ChannelHandlerContext
和ChannelHandler
之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;- 如同在本节开头所解释的一样,相对于其他类的同名方法,
ChannelHandlerContext
的方法将产生更短的事件流, 应该尽可能地利用这个特性来获得最大的性能。
与ChannelHandler
、ChannelPipeline
的关联使用
从ChannelHandlerContext
访问channel
ChannelHandlerContext ctx = ..;
// 获取channel引用
Channel channel = ctx.channel();
// 通过channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
从ChannelHandlerContext
访问ChannelPipeline
ChannelHandlerContext ctx = ..;
// 获取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 通过ChannelPipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
有时候我们不想从头传递数据,想跳过几个handler
,从某个handler
开始传递数据.我们必须获取目标handler
之前的handler
关联的ChannelHandlerContext
。
ChannelHandlerContext ctx = ..;
// 直接通过ChannelHandlerContext写数据,发送到下一个handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext
的基本使用应该掌握了,但是你真的理解ChannelHandlerContext
,ChannelPipeline
和Channelhandler
之间的关系了吗?不理解也没关系,因为源码以后会帮你理解的更为深刻。
核心组件之间的关系
- 一个
Channel
对应一个ChannelPipeline
- 一个
ChannelPipeline
包含一条双向的ChannelHandlerContext
链 - 一个
ChannelHandlerContext
中包含一个ChannelHandler
- 一个
Channel
会绑定到一个EventLoop
上 - 一个
NioEventLoop
维护了一个Selector(
使用的是 Java 原生的 Selector) - 一个
NioEventLoop
相当于一个线程
粘包拆包问题
粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP
有消息保护边界,不会发生粘包拆包问题,而因此粘包拆包问题只发生在TCP
协议中。具体讲TCP
是个”流"协议,只有流的概念,没有包的概念,对于业务上层数据的具体含义和边界并不了解,它只会根据TCP
缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP
拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP
粘包和拆包问题。
问题举例说明
下面针对客户端分别发送了两个数据表Packet1
和Packet2
给服务端的时候,TCP
粘包和拆包会出现的情况进行列举说明:
(1)第一种情况,服务端分两次正常收到两个独立数据包,即没有发生拆包和粘包的现象;
(2)第二种情况,接收端只收到一个数据包,由于TCP
是不会出现丢包的,所以这一个数据包中包含了客户端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于服务接收端来说很难处理。
(3)第三种情况,服务端分两次读取到了两个数据包,第一次读取到了完整的Packet1
和Packet2
包的部分内容,第二次读取到了Packet2
的剩余内容,这被称为TCP拆包;
(4)第四种情况,服务端分两次读取到了两个数据包,第一次读取到了部分的Packet1
内容,第二次读取到了Packet1
剩余内容和Packet2
的整包。
如果此时服务端TCP接收滑窗非常小,而数据包Packet1
和Packet2
比较大,很有可能服务端需要分多次才能将两个包接收完全,期间发生多次拆包。以上列举情况的背后原因分别如下:
- 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
- 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
- 进行
MSS
(最大报文长度)大小的TCP
分段,当TCP
报文长度-TCP
头部长度>MSS
的时候将发生拆包。 - 接收方法不及时读取套接字缓冲区数据,这将发生粘包。
如何基于Netty处理粘包、拆包问题
由于底层的TCP
无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
- 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
- 在包尾增加回车换行符进行分割,例如
FTP
协议; - 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常设计思路为消息头的第一个字段使用
int32
来表示消息的总长度; - 更复杂的应用层协议。
之前Netty示例中其实并没有考虑读半包问题,这在功能测试往往没有问题,但是一旦请求数过多或者发送大报文之后,就会存在该问题。如果代码没有考虑,往往就会出现解码错位或者错误,导致程序不能正常工作,下面看看Netty是如何根据主流的解决方案进行抽象实现来帮忙解决这一问题的。
如下表所示,Netty为了找出消息的边界,采用封帧方式:
方式 | 解码 | 编码 |
---|---|---|
固定长度 | FixedLengthFrameDecoder | 简单 |
分隔符 | DelimiterBasedFrameDecoder | 简单 |
专门的 length 字段 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
注意到,Netty提供了对应的解码器来解决对应的问题,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和半包问题。为什么这么说呢?下面列举一个包尾增加分隔符的例子:
代码语言:javascript复制import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: wuxiaofei
* @Date: 2020/8/15 0015 19:15
* @Version: 1.0
* @Description:入站处理器
*/
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger completeCounter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept[" request
"] and the counter is:" counter.incrementAndGet());
String resp = "Hello," request ". Welcome to Netty World!"
DelimiterEchoServer.DELIMITER_SYMBOL;
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 服务端读取完成网络数据后的处理*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.fireChannelReadComplete();
System.out.println("the ReadComplete count is "
completeCounter.incrementAndGet());
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
代码语言:javascript复制import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.net.InetSocketAddress;
/**
* @Author: wuxiaofei
* @Date: 2020/8/15 0015 19:17
* @Version: 1.0
* @Description:服务端
*/
public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~";
public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException {
DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
System.out.println("服务器即将启动");
delimiterEchoServer.start();
}
public void start() throws InterruptedException {
final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
.getBytes());
//服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterServerHandler());
}
}
}
添加到ChannelPipeline
的DelimiterBasedFrameDecoder
用于对使用分隔符结尾的消息进行自动解码,当然还有没有用到的FixedLengthFrameDecoder
用于对固定长度的消息进行自动解码等解码器。正如上门的代码使用案例,有了Netty提供的几码器可以轻松地完成对很多消息的自动解码,而且不需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。
Netty示例代码详解
相信看完上面的铺垫,你对Netty编码有了一定的了解了,下面再来整体梳理一遍吧。
1、设置EventLoopGroup
线程组(Reactor
线程组)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
上面我们说过Netty
中使用Reactor
模式,bossGroup
表示服务器连接监听线程组,专门接受 Accept
新的客户端client
连接。另一个workerGroup
表示处理每一连接的数据收发的线程组,来处理消息的读写事件。
2、服务端引导器
代码语言:javascript复制ServerBootstrap serverBootstrap = new ServerBootstrap();
集成所有配置,用来启动Netty
服务端。
3、设置ServerBootstrap
信息
serverBootstrap.group(bossGroup, workerGroup);
将两个线程组设置到ServerBootstrap
中。
4、设置ServerSocketChannel
类型
serverBootstrap.channel(NioServerSocketChannel.class);
设置通道的IO
类型,Netty
不止支持Java NIO
,也支持阻塞式IO
,例如OIO
OioServerSocketChannel.class)
5、设置参数
代码语言:javascript复制serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
通过option()
方法可以设置很多参数,这里SO_BACKLOG
标识服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows
为200,其他为128,这里设置的是100。
6、设置Handler
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
设置 ServerSocketChannel
对应的Handler
,这里只能设置一个,它会在SocketChannel
建立起来之前执行。
7、设置子Handler
代码语言:javascript复制serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
Netty
中提供了一种可以设置多个Handler
的途径,即使用ChannelInitializer
方式。ChannelPipeline
是Netty
处理请求的责任链,这是一个ChannelHandler
的链表,而ChannelHandler
就是用来处理网络请求的内容的。
每一个channel
,都有一个处理器流水线。装配child channel
流水线,调用childHandler()
方法,传递一个ChannelInitializer
的实例。
在 child channel
创建成功,开始通道初始化的时候,在bootstrap启动器中配置的ChannelInitializer
实例就会被调用。
这个时候,才真正的执行去执行 initChannel
初始化方法,开始通道流水线装配。
流水线装配,主要是在流水线pipeline
的后面,增加负责数据读写、处理业务逻辑的handler
。
处理器 ChannelHandler
用来处理网络请求内容,有ChannelInboundHandler
和ChannelOutboundHandler
两种,ChannlPipeline
会从头到尾顺序调用ChannelInboundHandler
处理网络请求内容,从尾到头调用ChannelOutboundHandler
处理网络请求内容
8、绑定端口号
代码语言:javascript复制ChannelFuture f = serverBootstrap.bind(PORT).sync();
绑定端口号
9、等待服务端端口号关闭
代码语言:javascript复制f.channel().closeFuture().sync();
等待服务端监听端口关闭,sync()
会阻塞主线程,内部调用的是 Object
的 wait()
方法
10、关闭EventLoopGroup线程组
代码语言:javascript复制bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
总结
这篇文章主要是从一个demo
作为引子,然后介绍了Netty
的包结构、Reactor
模型、编程规范等等,目的很简单,希望你能够读懂这段demo
并写出来。
后面开始继续Netty
源码解析部分,敬请期待。
参考资料
- 《Netty in Action》书籍
- 慕课Netty专栏
- 掘金闪电侠Netty小册
- 芋道源码Netty专栏
- Githubfork from krcys