Netty做什么?第一个Netty服务如何写?Netty的IO和Reactor模型?Netty组件是什么?ByteBuf是什么?
一 概述
1.1 初步了解Netty
- Netty是什么?
Netty是一个高性能的、异步的、基于事件驱动的网络应用型框架。
- 为什么使用netty?
a Netty是基于Java的NIO实现的,对各种API进行统一封装。
b 基于事件模型,我们可以在对应事件编码自己业务。让开发者聚焦业务。
c 高度可定制线程模型,单线程,一个或者多个线程池。
d Netty只依赖JDK底层api。
e 在通信方面,减少不必要内存拷贝,提高性能。
f 在安全方面,完整的SSL/TLS和StartTLS。
- Netty相对于NIO有什么优势?
a 对NIO中的API进行封装,使用简单。
b 写出高质量的NIO程序,需要多线程和网络编程的知识积累。
c NIO可靠性差,如:客户端从连、网络闪退、半包读写、失败缓存等问题。
d NIO会导致Selector空轮询,最终导致CPU100%,jdk1.7仍然会有这个问题,只是发送概率变低了。
1.2 Netty的架构是什么?
- 核心(Core)
a 可扩展的事件模型。
b 统一的通信api(无论是http还是socket都使用同意的api)。
c 零拷贝机制与字节缓冲区。
- 传输服务(Transport Services)
a 支持socket和datagram(数据报)。
b 支持http协议。
c In-VM Pipe(管道协议)。
- 协议支持(Protocol Support)
a http 以及 websocket。
b SSL 安全套接字协议⽀持。
c Google Protobuf (序列化框架)。
d ⽀持zlib、gzip压缩。
e ⽀持⼤⽂件的传输。
f RTSP(实时流传输协议,是TCP/IP协议体系中的⼀个应⽤层协议)。
g ⽀持⼆进制协议并且提供了完整的单元测试。
二 Netty中的模型
IO模型和Reactor模型
2.1 常见IO模型
2.1.1 BIO模型
对上图分析
代码语言:javascript复制a 客户端的并发数和服务端的线程数是一样多,随着并发量增加服务端线程数增加,服务端性能下降。
b 当连接创建后,该线程没有操作,会进行堵塞,不会释放线程。极大浪费服务器资源。
2.1.2 NIO模型
- 对上图组件解释
a Buffer:是缓冲区、底层通过数组实现。在NIO中所有的读写操作都是基于Buffer的。Java基本类型除了boolean都有缓冲区对象。
b Channel:通常叫为通道,用于连接客户端和服务端。是双向的,可以读也可以写。
c Selector:通常叫多路复用器,用于找出注册其上的发生读和写的channel。原理如下:
Selector不断轮询其上面的Channel,Channel发生读或者写,就会被Slector挑选出来。然后通过SelectionKey获取就绪Channel集合,进行IO读写操作。
- 对上图进行分析
一个线程处理多个通道,避免多线程之间上下问切换造成系统开销。通道只有事件的时候,才进行读写操作。
2.1.3 AIO模型
- 为什么要引入AIO模型
在NIO中,Selector多路复⽤器在做轮询时,如果没有事件发⽣,也会进⾏阻塞。
如何能把这个阻塞也优化掉呢?那么AIO就在这样的背景下诞⽣了。
- AIO简介
叫异步IO,该异步依赖操作系统底层异步IO。
- AIO的基本流程
用户线程通过系统调用,告知kernel内核启动某个IO操作,用户线程返回。kernel内核在
整个IO操作(包括数据准备、数据复制)完成后,通知⽤户程序,⽤户执⾏后续的业务操作。
4. AIO模型存在问题
代码语言:javascript复制a 完成事件的注册和传递,需要操作系统底层提供大量支持。
b Windows 系统下通过 IOCP 实现了真正的异步 I/O。但是目前高并发系统都是部署到Linux上。
c Linux系统支持AIO模型不稳定。该系统下异步都是以NIO实现的。
2.2 常见Reactor模型
2.2.1 Reactor模型概述
- Reactor模型是什么?
是一种并发编程模型,是一种思想,具有指导意义。
- 常见的Reactor模型
单线程模型、多线程模型、主从多线程模型。netty非常友好支持前面三种模型,一般采用主从架构方式。
- Reactor模型三种角色
a Reactor 监听和分配事件
b Acceptor 处理客户端新连接,并分配请求到处理链中。
c Handler 将自身和事件绑定,执行读写操作(完成channel的读入,执行业务逻辑并将结果写道channel)。
2.2.1 单线程模型
上图说明
代码语言:javascript复制a 一个线程完成业务处理。
b Reactor相当于一个多路复用器,用于监听事件,并把发生的事件传递给Handler或者Acceptor。
c 如果是建立连接事件,Reactor传递给Acceptor。如果是读写事件,Reactor传给Handler。
- 优点
a 优点
结构简单、单线程完成,没有进程通信问题。对一些业务场景简单,对性能要求不高的应用场景。
- 缺点
a 发挥不了服务器多核优势
b 客户端连接过多导致客户端连接多,Reactor线程负载过重。导致客户端连接超时,
最终导致大量信息积压。性能低。
c 单点故障后,导致系统通信故障。
2.2.2 多线程模型
上图说明
代码语言:javascript复制相对于单线程而言,不同点在于,Handler只负责用户响应和事件分发。真正业务逻辑在work线程池中处理。
- 存在问题
a 多线程数据共享比较复杂。如子线程完成业务处理后,把结果传递给主线程Reactor,
就会涉及数据的互斥和保护机制。
b Reactor承担所有的监听和响应。如果百万客户端连接,获取服务端进行客户端握手安
全认证,认证本身就很消耗性能。
2.2.3 主从多线程模型
上图说明
代码语言:javascript复制a Reactor分成两个部分,MainReactor负责监听server socket,用来处理网络io建立,
将建立的socketChannel指定注册给SubReactor。
b SubReactor建立和socket数据交互和事件业务处理。
1 优点
代码语言:javascript复制a 响应快 不必为单个同步事件所阻塞,虽然Reactor本身是同步的
b 可扩展性强 通过扩展SubReactor充分利用CPU资源
c 可复用性高 该模型和具体事件处理逻辑无关,具有很高复用性。
2.2.4 Netty模型(主要是主从多线程模型)
上图解释
代码语言:javascript复制a 在netty模型中,负责处理新连接的是BossGroup,负责其他事件的是WorkGroup。
(Group代表线程池的概念)
b NioEventLoop表示一个不断循环处理任务的线程,用于监听绑定在上面的读/写事件。
c PipeLine里面放着一个个ChannelHandler,ChannelHandler用于业务处理。
三 第一个Netty服务
3.1 服务端
- 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.haopt.iot</groupId>
<artifactId>first-netty</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 服务端-MyRPCServer
package com.haopt.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyRPCServer {
public void start(int port) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// 工作线程,线程数默认是:cpu核数*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker) //设置线程组
.channel(NioServerSocketChannel.class) //配置server通道
.childHandler(new MyChannelInitializer()); //worker线程的处理器
//ByteBuf 的分配要设置为非池化,否则不能切换到堆缓冲区模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("服务器启动完成,端口为:" port);
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- 服务端-ChannelHandler
package com.haopt.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class MyChannelHandler extends ChannelInboundHandlerAdapter {
/**
* 获取客户端发来的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("客户端发来数据:" msgStr);
//向客户端发送数据
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 测试用例
package com.haopt.netty.myrpc;
import com.haopt.netty.server.MyRPCServer;
import org.junit.Test;
public class TestServer {
@Test
public void testServer() throws Exception{
MyRPCServer myRPCServer = new MyRPCServer();
myRPCServer.start(5566);
}
}
3.2 客户端
- 客户端-client
package com.haopt.netty.client;
import com.haopt.netty.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
public void start(String host, int port) throws Exception {
//定义⼯作线程组
EventLoopGroup worker = new NioEventLoopGroup();
try {
//注意:client使⽤的是Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel
.handler(new MyClientHandler());
//连接到远程服务
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- 客户端-(ClientHandler)
package com.haopt.netty.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:"
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务端发送数据
String msg = "hello";
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
四 Netty核心组件
4.1 Channel
- 初识Channel
a 可以理解为socket连接,客户端和服务端连接的时候会创建一个channel。
负责基本的IO操作,例如:bind()、connect()、read()、write()。
b Netty的Channel接口所提供的API,大大减少了Socket类复杂性
- 常见Channel(不同的协议和阻塞类型的连接会有不同的Channel类型与之对应)
a NioSocketChannel,NIO的客户端 TCP Socket 连接。
b NioServerSocketChannel,NIO的服务器端 TCP Socket 连接。
c NioDatagramChannel, UDP 连接。
d NioSctpChannel,客户端 Sctp 连接。
e NioSctpServerChannel,Sctp 服务器端连接,这些通道涵盖了UDP和TCP⽹络IO以及⽂件IO。
4.2 EventLoopGroup、EventLoop
- 概述
有了Channel连接服务,连接之间消息流动。服务器发出消息称为出站,服务器接受消息称为入站。
那么消息出站和入站就产生了事件例如:连接已激活;数据读取;用户事件;异常事件;打开连接;
关闭连接等等。有了事件,有了事件就需要机制来监控和协调事件,这个机制就是EventLoop。
- 初识EventLoopGroup、EventLoop
对上图解释
代码语言:javascript复制a 一个EventLoopGroup包含一个或者多个EventLoop
b 一个EventLoop在生命周期内之和一个Thread绑定
c EventLoop上所有的IO事件在它专有的Thread上被处理。
d Channel在它生命周期只注册于一个Event Loop
e 一个Event Loop可能被分配给一个或者多个Channel
3 代码实现
代码语言:javascript复制// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
4.3 ChannelHandler
- 初识ChannelHandler
对于数据的出站和入栈的业务逻辑都是在ChannelHandler中。
- 对于出站和入站对应的ChannelHandler
ChannelInboundHandler ⼊站事件处理器
ChannelOutBoundHandler 出站事件处理器
3 开发中常用的ChannelHandler(ChannelInboundHandlerAdapter、SimpleChannelInboundHandler) a 源码
b SimpleChannelInboundHandler的源码(是ChannelInboundHandlerAdapter子类)
注意:两者的区别在于,前者不会释放消息数据的引⽤,⽽后者会释放消息数据的引⽤。
4.4 ChannelPipeline
- 初识ChannelPipeline
将ChannelHandler串起来。一个Channel包含一个ChannelPipeline,而ChannelPipeline维护者一个ChannelHandler列表。
ChannelHandler与Channel和ChannelPipeline之间的映射关系,由ChannelHandlerContext进⾏维护。
如上图解释
代码语言:javascript复制ChannelHandler按照加⼊的顺序会组成⼀个双向链表,⼊站事件从链表的head往后传递到最后⼀个ChannelHandler。
出站事件从链表的tail向前传递,直到最后⼀个ChannelHandler,两种类型的ChannelHandler相互不会影响。
4.5 Bootstrap
- 初识Bootstrap
是引导作用,配置整个netty程序,将各个组件串起来,最后绑定接口,启动服务。
- Bootstrap两种类型(Bootstrap、ServerBootstrap)
客户端只需要一个EventLoopGroup,服务端需要两个EventLoopGroup。
上图解释
代码语言:javascript复制与ServerChannel相关联的EventLoopGroup 将分配⼀个负责为传⼊连接请求创建 Channel 的EventLoop。
⼀旦连接被接受,第⼆个 EventLoopGroup 就会给它的 Channel 分配⼀个 EventLoop。
4.6 Future
- 初识
操作完成时通知应用程序的方式。这个对象可以看做异步操作执行结果占位符,它在将来某个时刻完成,并提供对其结果的访问。
- ChannelFuture的由来
JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,
只允许⼿动检查对应的操作是否已经完成,或者⼀直阻塞直到它完成。这是⾮常
繁琐的,所以 Netty 提供了它⾃⼰的实现--ChannelFuture,⽤于在执⾏异步
操作的时候使⽤。
- Netty为什么完全是异步?
a ChannelFuture提供了⼏种额外的⽅法,这些⽅法使得我们能够注册⼀个或者多个 ChannelFutureListener实例。
b 监听器的回调⽅法operationComplete(),将会在对应的操作完成时被调⽤。
然后监听器可以判断该操作是成功地完成了还是出错了。
c 每个 Netty 的出站 I/O 操作都将返回⼀个 ChannelFuture,也就是说,
它们都不会阻塞。所以说,Netty完全是异步和事件驱动的。
4.7 组件小结
上图解释
代码语言:javascript复制将组件串起来
五 缓存区-ByteBuf
5.1 ByteBuf概述
- 初识ByteBuf
JavaNIO提供了缓存容器(ByteBuffer),但是使用复杂。因此netty引入缓存ButeBuf,
一串字节数组构成。
- ByteBuf两个索引(readerIndex,writerIndex)
a readerIndex 将会根据读取的字节数递增
b writerIndex 也会根据写⼊的字节数进⾏递增
注意:如果readerIndex超过了writerIndex的时候,Netty会抛出IndexOutOf-BoundsException异常。
5.2 ByteBuf基本使用
- 读取
package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf01 {
public static void main(String[] args) {
//构造
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",
CharsetUtil.UTF_8);
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
while (byteBuf.isReadable()){ //⽅法⼀:内部通过移动readerIndex进⾏读取
System.out.println((char)byteBuf.readByte());
}
//⽅法⼆:通过下标直接读取
for (int i = 0; i < byteBuf.readableBytes(); i ) {
System.out.println((char)byteBuf.getByte(i));
}
//⽅法三:转化为byte[]进⾏读取
byte[] bytes = byteBuf.array();
for (byte b : bytes) {
System.out.println((char)b);
}
}
}
- 写入
package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf02 {
public static void main(String[] args) {
//构造空的字节缓冲区,初始⼤⼩为10,最⼤为20
ByteBuf byteBuf = Unpooled.buffer(10,20);
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
for (int i = 0; i < 5; i ) {
byteBuf.writeInt(i); //写⼊int类型,⼀个int占4个字节
}
System.out.println("ok");
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
while (byteBuf.isReadable()){
System.out.println(byteBuf.readInt());
}
}
}
- 丢弃已读字节
package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf03 {
public static void main(String[] args) {
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
while (byteBuf.isReadable()){
System.out.println((char)byteBuf.readByte());
}
byteBuf.discardReadBytes(); //丢弃已读的字节空间
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
}
}
- clear()
package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf04 {
public static void main(String[] args) {
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
byteBuf.clear(); //重置readerIndex 、 writerIndex 为0
System.out.println("byteBuf的容量为:" byteBuf.capacity());
System.out.println("byteBuf的可读容量为:" byteBuf.readableBytes());
System.out.println("byteBuf的可写容量为:" byteBuf.writableBytes());
}
}
5.3 ByteBuf 使⽤模式
5.3.1 根据存放缓冲区,分为三类
- 堆缓存区
内存的分配和回收速度⽐较快,可以被JVM⾃动回收,缺点是,如果进⾏socket的IO读写,需要额外做⼀次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有⼀定程度的下降。
由于在堆上被 JVM 管理,在不被使⽤时可以快速释放。可以通过 ByteBuf.array() 来获取 byte[] 数
据。
- 直接缓存区
⾮堆内存,它在对外进⾏内存分配,相⽐堆内存,它的分配和回收速度会慢⼀些,但是
将它写⼊或从Socket Channel中读取时,由于减少了⼀次内存拷⻉,速度⽐堆内存块。
- 复合缓存区
顾名思义就是将上述两类缓冲区聚合在⼀起。Netty 提供了⼀个 CompsiteByteBuf,
可以将堆缓冲区和直接缓冲区的数据放在⼀起,让使⽤更加⽅便。
5.3.2 缓存区选择
代码语言:javascript复制//默认使⽤的是DirectByteBuf,如果需要使⽤HeapByteBuf模式,则需要进⾏系统参数的设置.
//netty中IO操作都是基于Unsafe完成的
System.setProperty("io.netty.noUnsafe", "true");
//ByteBuf的分配要设置为⾮池化,否则不能切换到堆缓冲器模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
5.3.3 ByteBuf的分配
- Netty 提供了两种 ByteBufAllocator的实现
PooledByteBufAllocator,实现了ByteBuf的对象的池化,提⾼性能减少并最⼤限度地减少内存碎⽚。
UnpooledByteBufAllocator,没有实现对象的池化,每次会⽣成新的对象实例。
- 代码实现
//通过ChannelHandlerContext获取ByteBufAllocator实例
ctx.alloc();
//通过channel也可以获取
channel.alloc();
//Netty默认使⽤了PooledByteBufAllocator
//可以在引导类中设置⾮池化模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
//或通过系统参数设置
System.setProperty("io.netty.allocator.type", "pooled");
System.setProperty("io.netty.allocator.type", "unpooled");
5.5 ByteBuf的释放
ByteBuf如果采⽤的是堆缓冲区模式的话,可以由GC回收,但是如果采⽤的是直接缓冲区,就不受GC的 管理,就得⼿动释放,否则会发⽣内存泄露。
5.5.1 ByteBuf的手动释放
- 实现逻辑
⼿动释放,就是在使⽤完成后,调⽤ReferenceCountUtil.release(byteBuf); 进⾏释放。
通过release⽅法减去 byteBuf 的使⽤计数,Netty 会⾃动回收 byteBuf。
代码
代码语言:javascript复制/**
* 获取客户端发来的数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("客户端发来数据:" msgStr);
//释放资源
ReferenceCountUtil.release(byteBuf);
}
注意:⼿动释放可以达到⽬的,但是这种⽅式会⽐较繁琐,如果⼀旦忘记释放就可能会造成内存泄露。
5.5.1 ByteBuf的自动释放
⾃动释放有三种⽅式,分别是:⼊站的TailHandler、继承SimpleChannelInboundHandler、 HeadHandler的出站释放。
- TailHandler
Netty的ChannelPipleline的流⽔线的末端是TailHandler,默认情况下如果每个⼊站
处理器Handler都把消息往下传,TailHandler会释放掉ReferenceCounted类型的消息。
代码语言:javascript复制/**
* 获取客户端发来的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("客户端发来数据:" msgStr);
//向客户端发送数据
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
ctx.fireChannelRead(msg); //将ByteBuf向下传递
}
在DefaultChannelPipeline中的TailContext内部类会在最后执⾏:
代码语言:javascript复制@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
//最后会执⾏
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the
pipeline. " "Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg); //释放资源
}
}
- SimpleChannelInboundHandler
当ChannelHandler继承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的
channelRead()⽅法中,将会进⾏资源的释放,我们的业务代码也需要写⼊到channelRead0()中。
代码语言:javascript复制//SimpleChannelInboundHandler中的channelRead()
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg); //在这⾥释放
}
}
}
使用:
代码语言:javascript复制package com.haopt.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:"
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务端发送数据
String msg = "hello";
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- HeadHandler
出站处理流程中,申请分配到的 ByteBuf,通过 HeadHandler 完成⾃动释放。
在出站流程开始的时候,通过调⽤ ctx.writeAndFlush(msg),Bytebuf 缓冲
区开始进⼊出站处理的 pipeline 流⽔线 。在每⼀个出站Handler中的处理完
成后,最后消息会来到出站的最后⼀棒 HeadHandler,再经过⼀轮复杂的调⽤,
在flush完成后终将被release掉。
代码语言:javascript复制package com.haopt.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
System.out.println("接收到服务端的消息:"
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务端发送数据
String msg = "hello";
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
5.6 ByteBuf小结
代码语言:javascript复制a ⼊站处理流程中,如果对原消息不做处理,调⽤ ctx.fireChannelRead(msg) 把
原消息往下传,由流⽔线最后⼀棒 TailHandler 完成⾃动释放。
b 如果截断了⼊站处理流⽔线,则可以继承 SimpleChannelInboundHandler ,
完成⼊站ByteBuf ⾃动释放。
c 出站处理过程中,申请分配到的 ByteBuf,通过 HeadHandler 完成⾃动释放。
d ⼊站处理中,如果将原消息转化为新的消息并调⽤ ctx.fireChannelRead(newMsg)
往下传,那必须把原消息release掉。
e ⼊站处理中,如果已经不再调⽤ ctx.fireChannelRead(msg) 传递任何消息,
也没有继承SimpleChannelInboundHandler 完成⾃动释放,那更要把原消息
release掉。
更多Java相关内容。请关注微信公众号 花花与Java