JavaIO流:NIO梳理

2022-12-02 08:29:11 浏览数 (1)

NIO 也叫 Non-Blocking IO 是同步非阻塞的 IO 模型。线程发起 IO 请求后,立即返回。同步指的是必须等待 IO 缓冲区内的数据就绪,而非阻塞指的是,用户线程不原地等待 IO 缓冲区,可以先做一些其他操作,但是要定时轮询检查 IO 缓冲区数据是否就绪。 ~ 本篇内容包括:Java NIO 介绍、Java NIO 核心组件、NIO 代码示例。


文章目录
  • 一、Java NIO 介绍
  • 二、Java NIO 核心组件
    • 1、Buffer(缓冲区)
    • 2、Channel(通道)
    • 3、案例:往本地文件中写数据
    • 4、案例:往本地文件中读数据
    • 5、案例:文件拷贝
    • 6、Selector(选择器)
    • 7、SelectionKey
    • 8、ServerSocketChannel
    • 9、SocketChannel

  • 三、NIO 代码示例

一、Java NIO 介绍

NIO 也叫 Non-Blocking IO 是同步非阻塞的 IO 模型。线程发起 IO 请求后,立即返回。同步指的是必须等待 IO 缓冲区内的数据就绪,而非阻塞指的是,用户线程不原地等待 IO 缓冲区,可以先做一些其他操作,但是要定时轮询检查 IO 缓冲区数据是否就绪。

Java 中的 NIO 是 new IO的意思。其实是 NIO 加上 IO 多路复用技术。普通的 NIO 是线程轮询查看一个 IO 缓冲区是否就绪,而 Java 中的 new IO 指的是线程轮询地去查看一堆 IO 缓冲区中哪些就绪,这是一种 IO 多路复用的思想。IO多路复用模型中,将检查 IO 数据是否就绪的任务,交给系统级别的 select 或 epoll 模型,由系统进行监控,减轻用户线程负担。

NIO 与原来的 IO 有同样的作用和目的,但是使用的方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。NIO 可以理解为非阻塞 IO,传统的 IO 的 read 和 write 只能阻塞执行,线程在读写 IO 期间不能干其他事情,比如调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 中可以配置 socket 为非阻塞模式。

NIO主要有 buffer、channel、selector 三种技术的整合,通过零拷贝的 buffer 取得数据,每一个客户端通过 channel 在 selector(多路复用器)上进行注册。服务端不断轮询 channel 来获取客户端的信息。channel 上有 connect、accept(阻塞)、read(可读)、write(可写)四种状态标识。根据标识来进行后续操作。所以一个服务端可接收无限多的 channel。不需要新开一个线程。大大提升了性能。

NIO 通信模型图:


二、Java NIO 核心组件

NIO 有三大核心部分:Channel(通道) ,Buffer( 缓冲区),Selector(选择器)

1、Buffer(缓冲区)

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数组的操作,Buffer API 更加容易操作和管理。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer

在一般的 Java IO 操作中,我们以流式的方式,顺序的从一个 Stream 中读取一个或者多个字节,直至读取所有字节。因为它没有缓存区,所以我们就不能随意改变读取指针的位置。

我们在从 Channel 中读取数据到 Buffer 中,这样 Buffer 中就有了数据后,我们就可以对这些数据进行操作了。并且不同于一般的 Java IO 操作那样是顺序操作,NIO 中我们可以随意的读取任意位置的数据,这样大大增加了处理过程中的灵活性。

在NIO中,Buffer是一个顶层父类,它是一个抽象类,常用的Buffer子类有:

  • ByteBuffer,存储字节数据到缓冲区
  • ShortBuffer,存储字符串数据到缓冲区
  • CharBuffer,存储字符数据到缓冲区
  • IntBuffer,存储整数数据到缓冲区
  • LongBuffer,存储长整型数据到缓冲区
  • DoubleBuffer,存储小数到缓冲区
  • FloatBuffer,存储小数到缓冲区

对于Java中的基本数据类型,都有一个具体 Buffer 类型与之相对应,最常用的自然是 ByteBuffer 类(二进制数据),该类的主要方法如下所示 :

  • public abstract ByteBuffer put(byte[] b);:存储字节数据到缓冲区
  • public abstract byte[] get();:从缓冲区获得字节数据
  • public final byte[] array();:把缓冲区数据转换成字节数组
  • public static ByteBuffer allocate(int capacity);:设置缓冲区的初始容量
  • public static ByteBuffer wrap(byte[] array);:把一个现成的数组放到缓冲区中使用
  • public final Buffer flip();:翻转缓冲区,将缓冲区进行读写切换。
2、Channel(通道)

Java NIO 的通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的(input 或 output)读写通常是单向的。通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步地读写。

Java IO 的各种流是阻塞的 IO 操作。这就意味着,当一个线程执行读或写 IO 操作时,该线程会被阻塞,直到有一些数据被读取,或者数据完全写入。

Java NIO 可以让我们非阻塞的使用 IO 操作,例如:

  • 当一个线程执行从 Channel 执行读取 IO 操作时,当此时有数据,则读取数据并返回;当此时无数据,则直接返回而不会阻塞当前线程。
  • 当一个线程执行向 Channel 执行写入 IO 操作时,不需要阻塞等待它完全写入,这个线程同时可以做别的事情。

也就是说,线程可以将非阻塞 IO 的空闲时间用于在其他 Channel 上执行 IO 操作。所以,一个单独的线程,可以管理多个 Channel 的读取和写入 IO 操作。

常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。

  • FileChannel 用于文件的数据读写
  • DatagramChannel 用于 UDP 的数据读写
  • ServerSocketChannel 和 SocketChannel 用于TCP的数据读写。

FileChannel类,该类主要用来对本地文件进行IO操作,主要方法如下所示 :

  • public int read(ByteBuffer dst):读取数据并放到缓冲区中
  • public int write(ByteBuffer src):把缓冲区的数据写到通道中
  • public long transferFrom(ReadableByteChannel src,long position,long count):从目标通道中复制数据
  • public long transferTo(long position,long count,WritableByteChannel target):把数据从当前通道复制给目标通道
3、案例:往本地文件中写数据
代码语言:javascript复制
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileWriteTest {

    public static void main(String[] args) throws IOException {
        String str = "HELLO,NIO,我是我";
        // 创建输出流
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/lizhengi/test/iodemo/demo.txt");
        // 从流中得到一个通道
        FileChannel fileChannel = fileOutputStream.getChannel();
        // 提供一个缓冲区
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        // 往缓冲区中存入数据
        allocate.put(str.getBytes());
        // 缓冲区进行读写切换。
        // 当数据写入到缓冲区中时,指针指向数据最后一行,那么缓冲区写入通道中输出时,是从最后一行数据开始写入,
        // 这样就会导致写入1024的剩余没有数据的空缓冲区。所以需要翻转缓冲区,重置位置到初始位置。
        allocate.flip();
        // 把缓冲区写到通道中,通道负责把数据写入到文件中
        fileChannel.write(allocate);
        // 关闭输出流,因为通道是输出流创建的,所以会一起关闭
        fileOutputStream.close();
    }

}
4、案例:往本地文件中读数据
代码语言:javascript复制
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileReadTest {

    public static void main(String[] args) throws IOException {
        File file = new File("/Users/lizhengi/test/iodemo/demo.txt");
        // 1. 创建输入流
        FileInputStream fis = new FileInputStream(file);
        // 2. 得到一个通道
        FileChannel fc = fis.getChannel();
        // 3. 准备一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
        // 4. 从通道里读取数据并存到缓冲区中
        fc.read(buffer);
        System.out.println(new String(buffer.array()));
        // 5.关闭
        fis.close();
    }

}
5、案例:文件拷贝
代码语言:javascript复制
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class FileCopyTest {

    public static void main(String[] args) throws IOException {
        //1. 创建两个流
        FileInputStream fis = new FileInputStream("/Users/lizhengi/test/iodemo/demo.txt");
        FileOutputStream fos = new FileOutputStream("/Users/lizhengi/test/iodemo/temp.txt");
        // 2. 得到两个通道
        FileChannel sourceFc = fis.getChannel();
        FileChannel destFc = fos.getChannel();
        //3. 复制
        destFc.transferFrom(sourceFc,0,sourceFc.size());
        //4.关闭
        fis.close();
        fos.close();
    }
}
6、Selector(选择器)

Selector 是一个 Java NIO 组件,能够检测多个注册的 NIO 通道上是否有事件发生,便获取事件然后针对每个事件进行相应的响应处理。这样就可以只用一个线程去管理多个通道,也就是管理多个连接。这样使得只用在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。

选择器(Selector)是 NIO 能实现非阻塞的基础

程序切换到哪个 Channel 是由事件决定的,每个 Channel 都会对应一个 Buffer。

Selector 会根据不同的事件,在各个通道上切换,一个线程对应一个 Selector,一个 Selector 对应多个 Channel(连接)。

该类的常用方法如下所示 :

  • public static Selector open():得到一个选择器对象
  • public int select(long timeout):监控所有注册的 Channel,当其中有注册的 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集团中并返回,参数用来设置超时时间
  • public Set selectedKeys():从内部集合中得到所有的 SelectionKey
7、SelectionKey

SelectionKey,代表了 Selector 和 serverSocketChannel 的注册关系,一共四种 :

  • int OP_ACCEPT :有新的网络连接可以 accept,值为 16
  • int OP_CONNECT : 代表连接已经建立,值为 8
  • int OP_READ 和 int OP_WRITE : 代表了读、写操作,值为 1 和 4

该类的常用方法如下所示 :

  • public abstract Selector selector(),得到与之关联的 Selector 对象
  • public abstract SelectorChannel channel(),得到与之关联的通道
  • public final Object attachment(),得到与之关联的共享数据
  • public abstract SelectionKey interestOps(int ops),设置或改变监听事件
  • public final boolean isAcceptable(),是否可以 accept
  • public final boolean isReadable(),是否可以读
  • public final boolean isWritable(),是否可以写
8、ServerSocketChannel

ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示 :

  • public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
  • public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
  • public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
  • public final SelectionKey register(Selector sel,int ops),注册一个选择器并设置监听事件
9、SocketChannel

SocketChannel,网络IO通道,具体负责进行读写操作。NIO总是把缓冲区的数据写入通道,或者把通道里的数据读出到缓冲区(buffer)。常用方法如下所示 :

  • public static SocketChannel open(),得到一个SocketChannel通道
  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
  • public boolean connect(SocketAddress remote),连接服务器
  • public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作
  • public int write(ByteBuffer src),往通道里写数据
  • public int read(ByteBuffer dst),从通道里读数据
  • public final SelectionKey register(Selector sel,int ops,Object att),注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
  • public final void close(),关闭通道

三、NIO 代码示例

服务端:

代码语言:javascript复制
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    /**
     * 选择器
     */
    private Selector selector;

    /**
     * 默认服务绑定端口
     */
    private static final int DEFAULT_BIND_PORT = 9000;

    public NIOServer(int port) {
        initServer(port);
    }

    private void initServer(int port) {
        try {
            // 开启一个服务通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 将通道绑定到指定端口
            serverChannel.bind((port < 1 || port > 65535) ?
                    new InetSocketAddress(DEFAULT_BIND_PORT) :
                    new InetSocketAddress(port));
            // 将通道设置为非阻塞模式
            serverChannel.configureBlocking(false);
            // 打开一个 IO 监视器:Selector
            this.selector = Selector.open();
            // 将服务通道注册到 Selector 上,并在服务端通道注册 OP_ACCEPT 事件
            serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        } catch (IOException ioException) {
            ioException.printStackTrace();
            System.out.println("init exception: "   ioException);
        }
    }

    public void startServer() throws InterruptedException {
        while (true) {
            System.out.println("Selector 巡查 IO 事件---------------开始");
            try {
                int ioEventCount = this.selector.select();  // 此处以收集到所有 IO 事件
                System.out.println("Selector 检测到:"   ioEventCount);
            } catch (IOException ioException) {
                ioException.printStackTrace();
                break;
            }
            // 对各个 IO 事件做出对应的响应
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();  //通过调用迭代器的 remove() 方法将这个键 key 从已选择键的集合中删除

                try {
                    // 可接收连接 能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道
                    if (key.isAcceptable()) {
                        System.out.println("监控到 OP_ACCEPT 连接事件");
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();

                        // 接受客户端连接
                        SocketChannel client = server.accept();
                        System.out.println("Accept connection from "   client);
                        client.configureBlocking(false); // 设置客户端通道非阻塞
                        // 为客户端通道注册 OP_WRITE 和 OP_READ 事件
                        SelectionKey clientKey = client.register(selector,
                                SelectionKey.OP_WRITE |
                                        SelectionKey.OP_READ);
                        // 为客户端通道添加一个数据缓存区
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        clientKey.attach(buffer);
                    }
                    // 可读数据
                    if (key.isReadable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        int read = client.read(output);
                        System.out.println("Read data from client: "   client);
                        System.out.println("------------MSG : "   output.toString());
                        System.out.println(read);
                    }
                    // 可写数据
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        client.write(output);
                        output.compact();
                        System.out.println("Write data to "   client);
                    }
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }

            }

            Thread.sleep(2000);// 为了观察控制台打印数据
            System.out.println("Selector 巡查 IO 事件---------------完成");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NIOServer nioServer = new NIOServer(DEFAULT_BIND_PORT);
        nioServer.startServer();
    }
}

0 人点赞