庖丁解牛:NIO核心概念与机制详解 04 _ 分散和聚集

2023-11-21 12:54:02 浏览数 (1)


Pre

庖丁解牛:NIO核心概念与机制详解 01

庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

庖丁解牛:NIO核心概念与机制详解 03 _ 缓冲区分配、包装和分片


概述

分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法。

  • 一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中
  • 同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据

分散/聚集 I/O 对于将数据流划分为单独的部分很有用,这有助于实现复杂的数据格式。


分散/聚集 I/O

通道可以有选择地实现两个新的接口: ScatteringByteChannelGatheringByteChannel

一个 ScatteringByteChannel 是一个具有两个附加读方法的通道:

代码语言:javascript复制
long read( ByteBuffer[] dsts );
long read( ByteBuffer[] dsts, int offset, int length );

这些 long read() 方法很像标准的 read 方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。

在 分散读取 中,通道依次填充每个缓冲区。填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。


分散/聚集的应用

分散/聚集 I/O 对于将数据划分为几个部分很有用。

例如,

  • 可能在编写一个使用消息对象的网络应用程序,每一个消息被划分为固定长度的头部和固定长度的正文。
  • 可以创建一个刚好可以容纳头部的缓冲区和另一个刚好可以容难正文的缓冲区。当你将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。

我们从缓冲区所得到的方便性对于缓冲区数组同样有效。因为每一个缓冲区都跟踪自己还可以接受多少数据,所以分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。


聚集写入

聚集写入 类似于分散读取,只不过是用来写入。它也有接受缓冲区数组的方法:

代码语言:javascript复制
long write( ByteBuffer[] srcs );
long write( ByteBuffer[] srcs, int offset, int length );

聚集写对于把一组单独的缓冲区中组成单个数据流很有用。为了与上面的消息例子保持一致,我们可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。


Code

代码语言:javascript复制
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;

public class UseScatterGather
{
  static private final int firstHeaderLength = 2;
  static private final int secondHeaderLength = 4;
  static private final int bodyLength = 6;

  static public void main( String args[] ) throws Exception {
    if (args.length!=1) {
      System.err.println( "Usage: java UseScatterGather port" );
      System.exit( 1 );
    }

    int port = Integer.parseInt( args[0] );

    ServerSocketChannel ssc = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress( port );
    ssc.socket().bind( address );

    int messageLength =  firstHeaderLength   secondHeaderLength   bodyLength;

    ByteBuffer buffers[] = new ByteBuffer[3];
    buffers[0] = ByteBuffer.allocate( firstHeaderLength );
    buffers[1] = ByteBuffer.allocate( secondHeaderLength );
    buffers[2] = ByteBuffer.allocate( bodyLength );

    SocketChannel sc = ssc.accept();

    while (true) {

      // Scatter-read into buffers
      int bytesRead = 0;
      while (bytesRead < messageLength) {
        long r = sc.read( buffers );
        bytesRead  = r;

        System.out.println( "r " r );
        for (int i=0; i<buffers.length;   i) {
          ByteBuffer bb = buffers[i];
          System.out.println( "b " i " " bb.position() " " bb.limit() );
        }
      }

      // Process message here

      // Flip buffers
      for (int i=0; i<buffers.length;   i) {
        ByteBuffer bb = buffers[i];
        bb.flip();
      }

      // Scatter-write back out
      long bytesWritten = 0;
      while (bytesWritten<messageLength) {
        long r = sc.write( buffers );
        bytesWritten  = r;
      }

      // Clear buffers
      for (int i=0; i<buffers.length;   i) {
        ByteBuffer bb = buffers[i];
        bb.clear();
      }

      System.out.println( bytesRead " " bytesWritten " " messageLength );
    }
  }
}

0 人点赞