大家好,又见面了,我是你们的朋友全栈君。
正如之前所说,网络传输的基本单位是字节。Java NIO 提供了ByteBuffer作为它的容器,但是这个类使用起来比较复杂和麻烦。Netty提供了一个更好的实现:ByteBuf。
ByteBuf的API
Netty为数据处理提供的API通过抽象类ByteBuf
和接口ByteBufHolder
暴露出来。
下面列出ByteBuf API的优点:
- 可扩展到用户定义的buffer类型中
- 通过内置的复合buffer类型实现透明的零拷贝(zero-copy)
- 容量可以根据需要扩展
- 切换读写模式不需要调用
ByteBUffer.flip()
方法 - 读写采用不同的索引
- 支持方法链接调用
- 支持引用计数
- 支持池技术(比如:线程池、数据库连接池)
ByteBuf类—Netty的数据容器
因为所有的网络通信都涉及到字节序列的移动,一个有效而易用的数据结构是非常必要的。Netty的ByteBuf实现达到并超过这些需求。下面了解一下如何通过索引来简化对获取它持有数据的操作。
工作原理
ByteBuf维护两个不同的索引:读索引和写索引。当你从ByteBuf中读,它的readerIndex
增加了读取的字节数;同理,当你向ByteBuf中写,writerIndex
增加。下图显示了一个空的ByteBuf的布局和状态:
为了理解这些索引的关系,考虑一下如果你读数据时readerIndex
已经和writerIndex
一样会发生什么。在这个点,你已经读完了可读的数据。尝试继续往下读会引发一个IndexOutOfBoundsException
,就像引发数组越界那样。
ByteBuf中名称以read或write开头的方法会推进相应的索引,而以set或get开头的不会。
可以指定ByteBuf最大的容量,默认是Integer.MAX_VALUE
。
ByteBuf的使用模式
为了了解它的使用模式,我们得首先记住上图所展示的内容—一个数组以及两个索引来控制读和写。
堆缓冲区(HEAP BUFFER)
最常使用的ByteBuf模式将数据保存到JVM的堆中。被称为支持数组(backing array),这个模式提供了在没有使用池技术的情况下快速分配和释放(在堆缓冲区中)。这种方法是非常适合于来处理内置数据(legacy data,直译遗留数据,个人觉得这里翻译为内置数据更好理解一些,如果不合适,以后再改)的,如下所示:
代码语言:javascript复制ByteBuf heapBuf = ...;
if (heapBuf.hasArray()){
//检查是否有支持数组
byte[] array = heapBuf.array(); //得到支持数组
int offset = heapBuf.arrayOffset() heapBuf.readerIndex();//计算第一个字节的偏移量
int length = heapBuf.readableBytes();//计算可读字节数
handleArray(array, offset, length); //调用你的方法来处理这个array
}
当hasArray()返回false时尝试访问支持数组会抛出UnsupportedOperationException。这个用法与JDK的ByteBuffer类似
直接缓冲区(DIRECT BUFFER)
我们认为创建对象时内存总是从堆中分配?但并非总是如此。在JDK1.4中引入的NIO的ByteBuffer类允许JVM 通过本地(native)方法调用分配内存,其目的是通过免去中间交换的内存拷贝, 提升IO处理速度。
直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。这就解释了为什么直接缓冲区数据对网络数据传输来说是一种非常理想的方式。如果你的数据是存放在堆中分配的缓冲区,那么实际上,在通过 socket 发送数据之前,JVM需要将先数据复制到直接缓冲区。
这种方式的主要缺点是对于分配和释放内存空间来说比堆缓冲区消耗更大。如果你要处理内置数据的代码时可能会遇到另一个缺点:因为数据没有被分配到堆上,你可能需要做一个拷贝,如下所示:
代码语言:javascript复制ByteBuf directBuf = ...
if (!directBuf.hasArray()) {
//false表示为这是直接缓冲
int length = directBuf.readableBytes();//得到可读字节数
byte[] array = new byte[length]; //分配一个具有length大小的数组
directBuf.getBytes(directBuf.readerIndex(), array); //将缓冲区中的数据拷贝到这个数组中
handleArray(array, 0, length); //下一步处理
}
明显这要比使用支持数组的方式需要更多的工作,所以你如果提前知道数据会以一个数组的方式存取,推荐你使用堆内存。
复合缓冲区(COMPOSITE BUFFER)
第三种也是最后一种模式使用一个复合缓冲区,为多个ByteBuf提供一个组合的视图。你可以添加或者删除ByteBuf实例,一种JDK ByteBuffer中完全没有的实现。
Netty通过ByteBuf的子类-CompositeByteBuf
来实现这种模式,提供了将多个buffer虚拟成一个合并的Buffer的技术。
注意:CompositeByteBuf中的ByteBuf实例可能同时包含堆缓冲区的和直接缓冲区的。如果CompositeByteBuf只含有一个实例,调用hasArray()方法会返回这个实例的hasArray()方法的值;否则总是返回false。
让我们考虑一条由两部分组成的消息,header和body,通过HTTP传输。这两部分由不同的应用程序模块产生,这个应用有为多条消息重用同一个body的选项。在这种情况下,会为每条消息创建一个新的header。
因为你不想为每条消息的两个buffer都重新分配空间,CompositeByteBuf
就完美适合这种情况。
下图显示了这个消息的布局:
下面先介绍如何通过JDK的ByteBuffer来实现这个需求:
代码语言:javascript复制//通过一个数组来存储这条消息
ByteBuffer [] message = new ByteBuffer[]{
header,body};
//使用副本来合并这两个部分
ByteBuffer message2 =ByteBuffer.allocate(header.remaining() body.remaining());
message2.put(header);
message2.put(body);
message2.flip()
这种分配和拷贝的方式显然是低效且不合适的。下面介绍如何通过CompositeByteBuf
来实现:
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...; //直接缓冲或堆缓冲都可
ByteBuf bodyBuf = ...; // 直接缓冲或堆缓冲都可
messageBuf.addComponents(headerBuf, bodyBuf);//将ByteBuf实例添加到CompositeByteBuf中
.....
messageBuf.removeComponent(0); //删除header
for (ByteBuf buf : messageBuf) {
//遍历messageBuf中的ByteBuf
System.out.println(buf.toString());
}
CompositeByteBuf
可能不允许访问支持数组,所以访问CompositeByteBuf
中的数据的方式类似于直接缓冲区模式,如下所示:
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();//得到可读的字节数
byte[] array = new byte[length];//分配一个字节数组
compBuf.getBytes(compBuf.readerIndex(), array);//将数据读到这个字节数组中
handleArray(array, 0, array.length);
Netty通过CompositeByteBuf
来优化socket的IO操作,尽可能的消除JDK buffer实现中的性能和内存使用中的不足。尽管这些优化被封装到Netty的核心代码中,但你应该意识到这些优化的影响。
字节级别的操作
除了基本的读写操作,ByteBuf提供了大量的修改它数据的方法。下面我们会讨论最重要的一些。
随机访问索引
ByteBuf 使用从0开始的索引,第一个字节的索引是 0,最后一个字节的索引是 ByteBuf的capacity()- 1。下面的代码显示了迭代ByteBuf的内容有多简单:
代码语言:javascript复制ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i ) {
byte b = buffer.getByte(i);
System.out.println((char) b);
}
顺序访问索引
尽管ByteBuf有读写索引,而JDK的ByteBuffer只有一个索引,这就是为什么你需要使用flip()
方法来切换读写模式。下图显示了ByteBuf被两个索引分成了三个区域:
第一个是已经读取过的字节,因此可以被丢弃;第二个要可读的字节,也就是ByteBuf的内容;第三个是可添加新的字节的区域。
代码语言:javascript复制 ------------------- ------------------ ------------------
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
------------------- ------------------ ------------------
| | | |
0 <= readerIndex <= writerIndex <= capacity
这和源码中示例图类似。
可丢弃字节(Discardable bytes)
上图中标记为”discardable bytes”的部分中的字节已经被读取过,通过调用discardReadBytes()
方法它们能被丢弃同时它们的空间能被回收。这个部分中的初始大小为0,保存在readerIndex
中,随着读操作(read*
方法)的执行而增加。
下图显示了调用discardReadBytes()
后的结果。你能发现可丢弃字节部分的空间已经变得可用,并分配到可写空间中去了。注意,在调用discardReadBytes()
后无法保证可写部分的内容是什么样的。
你可能会经常调用discardReadBytes()
方法来增大可写部分的空间,请注意这很有可能会导致内存复制,因为可读的字节不得不移动到buffer的开端处。建议只有在真正需要的时候才这么做,比如当内存快要溢出(premium)的时候。
可读字节(Readable bytes)
可读字节部分存储了实际的内容。一个新分配、wrap、复制的buffer的readerIndex
是0。任何名称以read或skip开头的操作会检索或跳过当前的readerIndex
,然后增加读取了的字节数量。
看一下下面这个方法:
代码语言:javascript复制 /** * 将这个buffer(this,调用这个方法的buffer对象)的数据传输到这个特定的dst。以this对象的当前 * readerIndex开始直到dst变得不可写,然后增加传输的字节数量到this对象的readerIndex上。 * * @throws IndexOutOfBoundsException 如果 dst.writableBytes > this.readableBytes */
public abstract ByteBuf readBytes(ByteBuf dst);
如果尝试从一个已经没有可读数据的buffer中读取数据会引发一个IndexOutOfBoundsException。 下面的代码显示了如何读取所有可读的数据:
代码语言:javascript复制ByteBuf buffer = ...;
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
可写字节(Writable bytes)
可读字节部分是一个有未定义内容的可写内存区域。新分配的buffer的writerIndex
是0。任何名称以write开头的方法会从当前的writerIndex
开始写数据,增加刚才写的字节数量。
如果尝试写的字节大小超过了这个buffer的容量,会引发IndexOutOfBoundException。
下面显示了如何正确地向buffer中写数据:
代码语言:javascript复制ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
索引管理(Index management)
JDK的InputStream
定义了mark(int readlimit)和reset()两个方法,这些方法用来标注这个stream的当前索引到一个特定的位置,然后可以相应的将stream重置(reset())到刚才标注的位置。
类似地,你能通过调用markReaderIndex()
, markWriterIndex()
, resetReaderIndex()
和 resetWriterIndex()
来设置和复位ByteBuf的readerIndex
和writerIndex
。除了没有readlimit
参数来指定什么时候标记失效,这些很像InputStream
的调用。
你也能通过调用readerIndex(int)
或writerIndex(int)
方法来将索引移到一个特定的位置。尝试设置索引到一个无效的位置也会导致IndexOutOfBoundsException。
可以通过调用clear()
方法将readerIndex
和writerIndex
都设为0。注意这并不会导致任何内存释放。
假设在调用clear()
方法之前一个ByteBuf实例有3个部分,如下:
调用之后如下:
这一部分的大小和这个ByteBuf的容量一样大,因此所有的空间都是可写的。
调用clear()
的开销没有discardReadBytes()
那么大,因为它不需要任何内存复制。
搜索操作(Search operations)
有几种方法可以检测特定值的索引。最简单的是indexOf()
方法,更复杂的方式是调用以ByteBufProcessor
接口作为参数的方法。这个接口只定义了一个方法:
boolean process(byte value)
ByteBufProcessor
中还定义了很多目标常量,假如你的应用与Flash的socket有交互的话,它有以null为结尾的内容,调用forEachByte(ByteBufProcessor.FIND_NUL)
来处理Flash中的数据还是很简单高效的,因为在处理过程中只做了很少的一些边界检查。
通过ByteBufProcessor
来查找r
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
衍生(Derived)Buffer
衍生Buffer为ByteBuf提供一个以特定方式呈现内容的视图。这些视图通过如下方法创建:
- duplicate()
- slice()
- slice(int,int)
- Unpooled.unmodifiableBuffer(…)
- order(ByteOrder)
- readSlice(int)
每个方法返回一个新的具有自己的读、写和标记索引的ByteBuf实例。新实例和源实例(调用者写方法的实例)之间共享内部存储。这让一个衍生Buffer可以低消耗的创建,但也意味着你修改了衍生Buffer的内容会改变源Buffer的内容(因为是共享的),反之亦然。
复制ByteBuf 如果你想真正的复制一个buffer,可通过copy()或copy(int,int)方法。
下面的代码展示了slice(int,int)
方法的用法:
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);//创建一个ByteBuf
ByteBuf sliced = buf.slice(0, 14);//创建这个ByteBuf的一个切片
System.out.println(sliced.toString(utf8));//输出 Netty in Actio
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == sliced.getByte(0);//成功
System.out.println(sliced.toString(utf8));//输出 Jetty in Actio
下面看copy()方法与slice()方法的不同:
代码语言:javascript复制Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);//创建一个ByteBuf
ByteBuf copy = buf.copy(0, 14);//创建这个ByteBuf的一个切片
System.out.println(copy.toString(utf8));//输出 Netty in Actio
buf.setByte(0, (byte)'J');
assert buf.getByte(0) != copy.getByte(0);//成功
这两个例子展示了修改一个切片和一个拷贝对原来ByteBuf的影响。只要可能,推荐使用slice()方法来避免内存复制。
读写操作
Netty中有两种读写操作:
- get()和set()操作以一个指定的索引开始但不会修改这个索引
- read()和write()操作以给定的索引开始并根据访问的数据大小而修改索引
下表列出了最常用的get*
方法:
名称 | 描述 |
---|---|
setBoolean(int, boolean) | 设置Boolean值到给定索引处 |
setByte(int index, int value) | 设置byte值到给定索引处 |
setMedium(int index, int value) | 设置24位中整型(24-bit medium)值到给定索引处 |
setInt(int index, int value) | 设置int值到给定索引处 |
setLong(int index, int value) | 设置long值到给定索引处 |
setShort(int index, int value) | 设置short值到给定索引处 |
下面的代码描述了get()和set()方法的使用,从中可以看到它们不会修改读写索引:
代码语言:javascript复制Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
System.out.println((char)buf.getByte(0));//'N'
int readerIndex = buf.readerIndex();//存储当前的读索引
int writerIndex = buf.writerIndex();//存储当前的写索引
buf.setByte(0, (byte)'B');//更新值
System.out.println((char)buf.getByte(0));//'B'
assert readerIndex == buf.readerIndex();//成功
assert writerIndex == buf.writerIndex();//成功
下面来学习read*
操作,这些方法作用于当前的读写索引。它们通过将ByteBuf看成stream的方式来从中读数据,下表显示了常用的方法:
名称 | 描述 |
---|---|
readBoolean() | 返回当前readerIndex处的Boolean值,然后将readerIndex加1 |
readByte() | 返回当前readerIndex处的byte值,然后将readerIndex加1 |
readUnsignedByte() | 返回当前readerIndex处的无符号byte值并作为一个short类型,然后将readerIndex加1 |
readMedium() | 返回当前readerIndex处的24位中整型值,然后将readerIndex加3 |
readUnsignedMedium() | 返回当前readerIndex处的无符号24位中整型值,然后将readerIndex加3 |
readInt() | 返回当前readerIndex处的int值,然后将readerIndex加4 |
readUnsignedInt() | 返回当前readerIndex处的无符号int值,然后将readerIndex加4 |
readLong() | 返回当前readerIndex处的long值,然后将readerIndex加8 |
readShort() | 返回当前readerIndex处的short值,然后将readerIndex加2 |
readUnsignedShort() | 返回当前readerIndex处的无符号short值,然后将readerIndex加2 |
readBytes(ByteBuf or byte[]destination,int dstIndex [,int length]) | 将当前ByteBuf(从当前readerIndex开始)的数据传输到目标ByteBuf(从dstIndex开始复制)。并增加传输数据的大小(字节数量)到当前ByteBuf的readerIndex中 |
几乎每个read*
方法都有响应的write*
方法,注意下表列出的这些方法的参数要要写入的值,而不是索引值。
名称 | 描述 |
---|---|
writeBoolean(boolean) | 将Boolean值写入到当前writerIndex,然后将writerIndex加1 |
writeByte(int) | 将byte值写入到当前writerIndex,然后将writerIndex加1 |
writeMedium(int) | 将中整型值写入到当前writerIndex,然后将writerIndex加3 |
writeInt(int) | 将int值写入到当前writerIndex,然后将writerIndex加4 |
writeLong(long) | 将long值写入到当前writerIndex,然后将writerIndex加8 |
writeShort(int) | 将short值写入到当前writerIndex,然后将writerIndex加2 |
writeBytes(source ByteBuf or byte[] [,int srcIndex,int length]) | 将数据从特定的ByteBuf或byte数组传输到当前ByteBuf(从它的writerIndex开始写)中。如果提供了srcIndex和length,那么就会从srcIndex开始拷贝,并拷贝length大小的字节到当前ByteBuf中。并增加传输数据的大小(字节数量)到当前ByteBuf的writerIndex中 |
下面展示这些方法怎么使用:
代码语言:javascript复制Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
System.out.println((char)buf.readByte());//N
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
buf.writeByte((byte)'?');
System.out.println(buf.toString(utf8));//etty in Action rocks!?
assert readerIndex == buf.readerIndex();//成功
assert writerIndex != buf.writerIndex();//成功
更多的操作
下表列出了其他有用的操作:
名称 | 描述 |
---|---|
isReadable() | 如果至少还有一个字节可被读的话返回true |
isWritable() | 如果至少还有一个位可写入的话返回true |
readableBytes() | 返回可读的字节数量 |
writableBytes() | 返回可写的字节数量 |
capacity() | 返回当前ByteBuf能存储的字节数量 |
maxCapacity() | 返回ByteBuf最多能存储多少字节数量 |
hasArray() | 返回Byte是否有支持数组 |
array() | 如果有支持数组,返回;否则抛UnsupportedOperationException |
ByteBufHolder接口
我们经常会遇到除了需要存储实际的数据内容还需要存储各种各样的属性值的情况,比如HTTP响应,除了以字节表示的内容还会有状态码、cookie等等。
Netty提供了ByteBufHolder接口来满足这种用例。ByteBufHolder也支持Netty的高级特性,如buffer池,可以从buffer池中取一个可用的ByteBuf,如果需要还可以自动释放。
ByteBufHolder只有几个方法来访问底层的ByteBuf和引用计数,下表中列出了这些方法:
名称 | 描述 |
---|---|
content() | 返回这个ByteBufHolder持有的ByteBuf |
copy() | 返回这个ByteBufHolder的一个深复制,包括一个它含有的ByteBuf数据的一个拷贝(非共享的) |
duplicate() | 返回这个ByteBufHolder的一个浅复制,包括一个它含有的ByteBuf数据的一个拷贝(共享的) |
如果你想实现一个“消息对象”有效负载存储在ByteBuf,使用ByteBufHolder是一个好主意。
ByteBuf内存分配(ByteBuf allocation)
这一节我们会介绍管理ByteBuf实例的方法
请求式:ByteBufAllocator接口
为了减少分配和释放内存的总开销,Netty通过ByteBufAllocator实现ByteBuf池,ByteBufAllocator负责分配ByteBuf实例。
下表列出了ByteBufAllocator提供的方法:
名称 | 描述 |
---|---|
buffer() | 返回一个基于直接缓冲区或堆缓冲区的ByteBuf |
heapBuffer() | 返回一个基于堆缓冲区的ByteBuf |
directBuffer() | 返回一个基于直接缓冲区的ByteBuf |
ioBuffer() | 返回一个适用于socket上的IO操作的ByteBuf(一般是直接缓冲区的) |
你可以从一个Channel或与ChannelHandler绑定的ChannelHandlerContext中获得一个ByteBufAllocator的引用。
下面展示了这种方式的用法:
代码语言:javascript复制Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
Netty为ByteBufAllocator提供了两种实现: PooledByteBufAllocator和UnpooledByteBufAllocator。前者通过ByteBuf的实例放入池中来提高性能同时减少内存碎片。后者每次返回一个新建的实例而没有通过池来缓存。
尽管Netty默认使用PooledByteBufAllocator,但是可以在启动你应用的时候通过 ChannelConfig 的 API来修改。
Unpooled buffers
可能你无法获得ByteBufAllocator的引用,在这种情况下,Netty提供了一个叫做Unpooled
的工具类,它提供了静态的方法来创建不从池中获取的(unpooled)新建的ByteBuf实例。
下表列出了最重要的一些方法:
名称 | 描述 |
---|---|
buffer() | 返回一个基于堆缓冲区的ByteBuf |
directBuffer() | 返回一个基于直接缓冲区的ByteBuf |
wrappedBuffer() | 返回一个包装了(wrap)给定数据的ByteBuf |
copiedBuffer() | 返回一个持有给定数据的一个拷贝的ByteBuf |
Unpooled
类使在非网络编程项目中也可以用到ByteBuf。
ByteBufUtil类
ByteBufUtil提供了用于操纵ByteBuf的静态帮助方法。因为这API是通用的,与池无关,因此这些方法已经在分配内存的类之外实现了。
这些静态方法中最重要的可能是hexdump()
了,它能以十六进制的形式打印一个ByteBuf实例的内容。这在日志记录中十分有用;另一个有用的方法是equals(ByteBuf,ByteBuf)
方法,它返回这两个ByteBuf是否相等(以一定的规则判断,比如可读字节数大小、a[aStartIndex : aStartIndex length] == b[bStartIndex : bStartIndex length]
其中 a[i:k]表示a[i]、a[i 1]、a[i 2]…a[k-1])。
引用计数
引用计数是一种优化内存使用和性能的技术,当对象不再被引用时,释放对象所持有的资源。Netty从4.0版本开始为ByteBuf和ByteBufHolder引入了引用计数,它们都实现了ReferenceCounted
接口。
引用计数背后的原理不是很复杂,主要是跟踪有多少个活跃引用指向了某个对象。只要某个对象的引用计数大于0,可以保证这个对象不会被释放。当某个对象的引用计数变成0时,这个对象将会被释放。
引用计数对于池技术的实现至关重要,比如PooledByteBufAllocator
。
在接下来的两段代码中,将会看到相关例子:
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
ByteBuf buffer = allocator.directBuffer();
assert buffer.refCnt() == 1;//检测buffer的引用计数是否为1
代码语言:javascript复制ByteBuf buffer = ...;
//减少这个对象的引用计数,如果减少到0,这个对象将会被释放
//,并且这个方法返回true。
boolean released = buffer.release();
试图访问被释放的引用计数对象将会导致一个IllegalReferenceCountException。 注意,一个特定的类可以自己定义它减少引用计数的方式,比如可以一次性将引用计数设为0。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/192660.html原文链接:https://javaforall.cn