02.Netty与NIO之前世今生

2022-05-10 21:05:05 浏览数 (1)

¶ JavaNIO三件套

在 NIO 中有几个核心对象需要掌握:缓冲区(Buffer)、选择器(Selector)、通道(Channel)

¶ 缓冲区Buffer

¶ Buffer操作基本API

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在 NIO 库中,所有数据都是用缓冲区处理的。在读 取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都 是将它放到缓冲区中。而在面向流 I/O 系统中,所有数据都是直接写入或者直接将数据读取到 Stream 对象中。 在 NIO 中,所有的缓冲区类型都继承于抽象类 Buffer,最常用的就是 ByteBuffer,对于 Java 中的基本类型,基本都有 一个具体 Buffer 类型与之相对应,它们之间的继承关系如下图所示:

代码语言:javascript复制
public class IntBufferDemo {
    public static void main(String[] args) {  
        // 分配新的int缓冲区,参数为缓冲区容量
        // 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。  
        IntBuffer buffer = IntBuffer.allocate(8);
  
        for (int i = 0; i < buffer.capacity();   i) {  
            int j = 2 * (i   1);  
            // 将给定整数写入此缓冲区的当前位置,当前位置递增  
            buffer.put(j);  
        }
        // 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0  
        buffer.flip();
        // 查看在当前位置和限制位置之间是否有元素  
        while (buffer.hasRemaining()) {  
            // 读取此缓冲区当前位置的整数,然后当前位置递增  
            int j = buffer.get();  
            System.out.print(j   "  ");  
        }
    }  
}
¶buffer-的基本的原理

我们说缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制, 能够跟踪和记录缓冲区的状态变化情况,如果我们使用 get()方法从缓冲区获取数据或者使用 put()方法把数据写入缓冲 区,都会引起缓冲区状态的变化。

在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪:

position:指定下一个将要被写入或者读取的元素索引,它的值由 get()/put()方法自动更新,在新创建一个 Buffer 对象 时,position 被初始化为 0。

limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。

capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我 们使用的底层数组的容量。

以上三个属性值之间有一些相对大小的关系:0 <= position <= limit <= capacity。如果我们创建一个新的容量大小为 10 的 ByteBuffer 对象,在初始化的时候,position 设置为 0,limit 和 capacity 被设置为 10,在以后使用 ByteBuffer 对象过程中,capacity 的值不会再发生变化,而其它两个个将会随着使用而变化。

代码语言:javascript复制
public class BufferDemo {
    //put/get

    public static void main(String args[]) throws Exception {  
        //这用用的是文件IO处理 test.txt 里面的内容为Tom.
        FileInputStream fin = new FileInputStream("D://test.txt");
        //创建文件的操作管道
        FileChannel fc = fin.getChannel();  
  
        //分配一个10个大小缓冲区,说白了就是分配一个10个大小的byte数组
        ByteBuffer buffer = ByteBuffer.allocate(10);  
        output("初始化", buffer);  
        
        //先读一下 读取了四个内容 position 移动到了4
        fc.read(buffer);  
        output("调用read()", buffer);  
  
        //准备操作之前,先锁定操作范围 切换读写模式。将 position 移动到0 切换成了读模式。所以position 要从0开始读
        buffer.flip();  
        output("调用flip()", buffer);  
  
        //判断有没有可读数据 使用读的模式读取内容
        while (buffer.remaining() > 0) {  
            // 每读取一次数据。 position 移动一位元素
            byte b = buffer.get();  
            // System.out.print(((char)b));  
        }  
        output("调用get()", buffer);  
  
        //可以理解为解锁 解锁之后恢复position位置
        buffer.clear();  
        output("调用clear()", buffer);  
  
        //最后把管道关闭
        fin.close();  
    }  

    //把这个缓冲里面实时状态给答应出来
    public static void output(String step, ByteBuffer buffer) {
        System.out.println(step   " : "); 
        //容量,数组大小
        System.out.print("capacity: "   buffer.capacity()   ", ");
        //当前操作数据所在的位置,也可以叫做游标
        System.out.print("position: "   buffer.position()   ", ");
        //锁定值,flip,数据操作范围索引只能在position - limit 之间
        System.out.println("limit: "   buffer.limit());
        System.out.println();
    }
}

运行结果如下:

运行结果我们已经看到,下面呢对以上结果进行图解,四个属性值分别如图所示:

我们可以从通道中读取一些数据到缓冲区中,注意从通道读取数据,相当于往缓冲区中写入数据。如果读取 4 个自己 的数据,则此时 position 的值为 4,即下一个将要被写入的字节索引为 4,而 limit 仍然是 10,如下图所示:

下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据,在此之前,必须调用 flip()方法,该方法将会完 成两件事情:

  1. 把 limit 设置为当前的 position 值
  2. 把 position 设置为 0

由于 position 被设置为 0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节,而 limit 被设置为当前的 position,可以保证读取的数据正好是之前写入到缓冲区中的数据,如下图所示:

现在调用 get()方法从缓冲区中读取数据写入到输出通道,这会导致 position 的增加而 limit 保持不变,但 position 不 会超过 limit 的值,所以在读取我们之前写入到缓冲区中的 4 个自己之后,position 和 limit 的值都为 4,如下图所示:

在从缓冲区中读取数据完毕后,limit 的值仍然保持在我们调用 flip()方法时的值,调用 clear()方法能够把所有的状态变 化设置为初始化时的值,如下图所示:

¶3缓冲区的分配

在创建一个缓冲区对象时,会调用静态方法 allocate()来指定缓冲区的容量,其实调用 allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。或者我们也可以直接将一个现有的数组,包装为缓冲区对 象,如下示例代码所示:

代码语言:javascript复制
public class BufferWrap {  
    
    public void myMethod() {  
        // 分配指定大小的缓冲区  
        ByteBuffer buffer1 = ByteBuffer.allocate(10);  
          
        // 包装一个现有的数组  
        byte array[] = new byte[10];  
        ByteBuffer buffer2 = ByteBuffer.wrap( array );
    } 
}
¶4缓冲区分片

在 NIO 中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切 出一片来作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的,也就是说,子缓冲区相当于是 现有缓冲区的一个视图窗口。调用 slice()方法可以创建一个子缓冲区,让我们通过例子来看一下:

代码语言:javascript复制
/**
 * 缓冲区分片
 */
public class BufferSlice {  
    static public void main( String args[] ) throws Exception {  
        ByteBuffer buffer = ByteBuffer.allocate( 10 );  
          
        // 缓冲区中的数据0-9  
        for (int i=0; i<buffer.capacity();   i) {  
            buffer.put( (byte)i );  
        }  
          
        // 创建子缓冲区  
        buffer.position( 3 );  
        buffer.limit( 7 );  
        // 这样子得到的数据位 3 - 7 的位置的数据新建一个缓冲区
        ByteBuffer slice = buffer.slice();  
          
        // 改变子缓冲区的内容  
        for (int i=0; i<slice.capacity();   i) {  
            byte b = slice.get( i );  
            b *= 10;  
            slice.put( i, b );  
        }  
          
        buffer.position( 0 );  
        buffer.limit( buffer.capacity() );  
          
        while (buffer.remaining()>0) {  
            System.out.println( buffer.get() );  
        }  
    }  
}

在该示例中,分配了一个容量大小为 10 的缓冲区,并在其中放入了数据 0-9,而在该缓冲区基础之上又创建了一个子缓冲区,并 改变子缓冲区中的内容,从最后输出的结果来看,只有子缓冲区“可见的”那部分数据发生了变化,并且说明子缓冲区与原缓冲区是 数据共享的,输出结果如下所示:

¶5只读缓冲区

只读缓冲区非常简单,可以读取它们,但是不能向它们写入数据。可以通过调用缓冲区的 asReadOnlyBuffer()方法,将任何常规缓 冲区转 换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原 缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化:

代码语言:javascript复制
public class ReadOnlyBuffer {  
    static public void main( String args[] ) throws Exception {  
        ByteBuffer buffer = ByteBuffer.allocate( 10 );  
        
        // 缓冲区中的数据0-9  
        for (int i=0; i<buffer.capacity();   i) {  
            buffer.put( (byte)i );  
        }  
    
        // 创建只读缓冲区  
        ByteBuffer readonly = buffer.asReadOnlyBuffer();  
        
        // 改变原缓冲区的内容  
        for (int i=0; i<buffer.capacity();   i) {  
            byte b = buffer.get( i );  
            b *= 10;  
            buffer.put( i, b );  
        }  
        
        readonly.position(0);  
        readonly.limit(buffer.capacity());  
        
        // 只读缓冲区的内容也随之改变  
        while (readonly.remaining()>0) {  
            System.out.println( readonly.get());  
        }
    }
}

如果尝试修改只读缓冲区的内容,则会报 ReadOnlyBufferException 异常。只读缓冲区对于保护数据很有用。在将缓冲区传递给某 个 对象的方法时,无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以保证该缓冲区不会被修改。只可以 把常规缓冲区转换为只读缓冲区,而不能将只读的缓冲区转换为可写的缓冲区。

¶6.直接缓冲区

直接缓冲区是为加快 I/O 速度,使用一种特殊方式为其分配内存的缓冲区,JDK 文档中的描述为:给定一个直接字节缓冲区,Java 虚拟机将尽最大努力直接对它执行本机 I/O 操作。也就是说,它会在每一次调用底层操作系统的本机 I/O 操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。要分配直接缓冲区,需要调用 allocateDirect() 方法,而不是 allocate()方法,使用方式与普通缓冲区并无区别,如下面的拷贝文件示例:

代码语言:javascript复制
/**
 * 直接缓冲区
 * Zero Copy 减少了一个拷贝的过程
  */
public class DirectBuffer {  
    static public void main( String args[] ) throws Exception {  

        //在Java里面存的只是缓冲区的引用地址
        //管理效率

        //首先我们从磁盘上读取刚才我们写出的文件内容
        String infile = "D://test.txt";
        FileInputStream fin = new FileInputStream( infile );  
        FileChannel fcin = fin.getChannel();

        //把刚刚读取的内容写入到一个新的文件中
        String outfile = String.format("D://testcopy.txt");
        FileOutputStream fout = new FileOutputStream(outfile);
        FileChannel fcout = fout.getChannel();  
          
        // 使用allocateDirect,而不是allocate
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);  
          
        while (true) {  
            buffer.clear();  
              
            int r = fcin.read(buffer);  
              
            if (r==-1) {  
                break;  
            }  
              
            buffer.flip();  
              
            fcout.write(buffer);  
        }
    }  
}
¶7.内存映射

内存映射是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的 I/O 快的多。内存映射文件 I/O 是通过使文件中的 数据出现为 内存数组的内容来完成的,这其初听起来似乎不过就是将整个文件读到内存中,但是事实上并不是这样。一般来说, 只有文件中实际读取或者写入的部分才会映射到内存中。如下面的示例代码:

代码语言:javascript复制
public class MappedBuffer {  
    static private final int start = 0;
    static private final int size = 26;
      
    static public void main( String args[] ) throws Exception {  
        RandomAccessFile raf = new RandomAccessFile( "D://test.txt", "rw" );
        FileChannel fc = raf.getChannel();
        
        //把缓冲区跟文件系统进行一个映射关联
        //只要操作缓冲区里面的内容,文件内容也会跟着改变
        MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE,start, size );
          
        mbb.put( 0, (byte)97 );  //a
        mbb.put( 25, (byte)122 );   //z

        raf.close();  
    }  
}

¶212-选择器-selector

传统的 Server/Client 模式会基于 TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理 一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题, 都采用了线程池模型,并设置线程池线程的最大数量,这又带来了新的问题,如果线程池中有 200 个线程,而有 200 个用户都在 进行大文件下载,会导致第201个用户的请求无法及时处理,即便第201个用户只想请求一个几KB大小的页面。传统的 Server/Client 模式如下图所示:

NIO 中非阻塞 I/O 采用了基于 Reactor 模式的工作方式,I/O 调用不会被阻塞,相反是注册感兴趣的特定 I/O 事件,如可读数据到 达,新的套接字连接等等,在发生特定事件时,系统再通知我们。NIO 中实现非阻塞 I/O 的核心对象就是 Selector,Selector 就是 注册各种 I/O 事件地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:

从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector 中获得相应的 SelectionKey,同时从 SelectionKey 中可 以找到发生的事件和该事件所发生的具体的 SelectableChannel,以获得客户端发送过来的数据。

使用 NIO 中非阻塞 I/O 编写服务器处理程序,大体上可以分为下面三个步骤:

1.向 Selector 对象注册感兴趣的事件。

2.从 Selector 中获取感兴趣的事件。

3.根据不同的事件进行相应的处理。

代码语言:javascript复制
/*
 * 注册事件
 */
private Selector getSelector() throws IOException {
    // 创建 Selector 对象
    Selector sel = Selector.open();
    // 创建可选择通道,并配置为非阻塞模式
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    // 绑定通道到指定端口
    ServerSocket socket = server.socket();
    InetSocketAddress address = new InetSocketAddress(port);
    socket.bind(address);
    // 向 Selector 中注册感兴趣的事件
    server.register(sel, SelectionKey.OP_ACCEPT);
    return sel;
}

创建了 ServerSocketChannel 对象,并调用 configureBlocking()方法,配置为非阻塞模式,接下来的三行代码把该通道绑定到指定 端口,最后向 Selector 中注册事件,此处指定的是参数是 OP_ACCEPT,即指定我们想要监听 accept 事件,也就是新的连接发 生 时所产生的事件,对于 ServerSocketChannel 通道来说,我们唯一可以指定的参数就是 OP_ACCEPT。

从 Selector 中获取感兴趣的事件,即开始监听,进入内部循环:

代码语言:javascript复制
public void listen(){
    System.out.println("listen on "   this.port   ".");
    try {
        //轮询主线程

        while (true){
            //大堂经理再叫号
            selector.select();
            //每次都拿到所有的号子
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            //不断地迭代,就叫轮询
            //同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                //每一个key代表一种状态
                //没一个号对应一个业务
                //数据就绪、数据可读、数据可写 等等等等
                process(key);
            }
            
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

在非阻塞 I/O 中,内部循环模式基本都是遵循这种方式。首先调用 select()方法,该方法会阻塞,直到至少有一个事件发生,然后 再使用 selectedKeys()方法获取发生事件的 SelectionKey,再使用迭代器进行循环。

最后一步就是根据不同的事件,编写相应的处理代码:

代码语言:javascript复制
private void process(SelectionKey key) throws IOException {
    //针对于每一种状态给一个反应
    if(key.isAcceptable()){
        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        //这个方法体现非阻塞,不管你数据有没有准备好
        //你给我一个状态和反馈
        SocketChannel channel = server.accept();
        //一定一定要记得设置为非阻塞
        channel.configureBlocking(false);
        //当数据准备就绪的时候,将状态改为可读
        key = channel.register(selector,SelectionKey.OP_READ);
    }
    else if(key.isReadable()){
        //key.channel 从多路复用器中拿到客户端的引用
        SocketChannel channel = (SocketChannel)key.channel();
        int len = channel.read(buffer);
        if(len > 0){
            buffer.flip();
            String content = new String(buffer.array(),0,len);
            key = channel.register(selector,SelectionKey.OP_WRITE);
            //在key上携带一个附件,一会再写出去
            key.attach(content);
            System.out.println("读取内容:"   content);
        }
    }
    else if(key.isWritable()){
        SocketChannel channel = (SocketChannel)key.channel();

        String content = (String)key.attachment();
        channel.write(ByteBuffer.wrap(("输出:"   content).getBytes()));

        channel.close();
    }
}

此处分别判断是接受请求、读数据还是写事件,分别作不同的处理。在 Java1.4 之前的 I/O 系统中,提供的都是面向流的 I/O 系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的 I/O 速度非常慢,而在 Java 1.4 中推出了 NIO,这是一个面向块的 I/O 系统,系统以块的方式处理处理,每一个操作在 一步中产生或者消费一个数据库,按块处理要比按字节处理数据快的多。

¶213-通道-channel

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通 道读入缓冲区,再从缓冲区获取这个字节。

在 NIO 中,提供了多种通道对象,而所有的通道对象都实现了 Channel 接口。它们之间的继承关系如下图所示:

¶1.使用-nio-读取数

任何时候读取数据,都不是直接从通道读取,而是从通道读取到缓冲区。所以使用 NIO 读取数据可 以分为下面三个步骤:

1.从 FileInputStream 获取 Channel

2.创建 Buffer

3.将数据从 Channel 读取到 Buffer

¶2.使用-nio-写入数据

使用 NIO 写入数据与读取数据的过程类似,同样数据不是直接写入通道,而是写入缓冲区,可以分为下面三个步骤:

1.从 FileInputStream 获取 Channel。

2.创建 Buffer。

3.将数据从 Channel 写入到 Buffer 中。

代码语言:javascript复制
public class FileInputDemo {
    static public void main( String args[] ) throws Exception {  
        FileInputStream fin = new FileInputStream("E://test.txt");
        
        // 获取通道  
        FileChannel fc = fin.getChannel();  
          
        // 创建缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(1024);  
          
        // 读取数据到缓冲区  
        fc.read(buffer);  
        
        buffer.flip();  
          
        while (buffer.remaining() > 0) {  
            byte b = buffer.get();  
            System.out.print(((char)b));  
        }  
          
        fin.close();
    }  
}
public class FileOutputDemo {
    static private final byte message[] = { 83, 111, 109, 101, 32, 98, 121, 116, 101, 115, 46 };
  
    static public void main( String args[] ) throws Exception {  
        FileOutputStream fout = new FileOutputStream( "E://test.txt" );
          
        FileChannel fc = fout.getChannel();  
          
        ByteBuffer buffer = ByteBuffer.allocate( 1024 );  
          
        for (int i=0; i<message.length;   i) {  
            buffer.put( message[i] );  
        }  
          
        buffer.flip();   
          
        fc.write( buffer );  
          
        fout.close();  
    }  
}
¶3.io-多路复用

目前流行的多路复用 IO 实现主要包括四种:select、poll、epoll、kqueue。

多路复用 IO 技术最适用的是“高并发”场景,所谓高并发是指 1 毫秒内至少同时有上千个连接请求准备好。其他情 况下多路复用 IO 技术发挥不出来它的优势。另一方面,使用 JAVA NIO 进行功能实现,相对于传统的 Socket 套接字 实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。

¶nio-源码初探

¶反应堆-reactor

阻塞 I/O 在调用 InputStream.read()方法时是阻塞的,它会一直等到数据到来时(或超时)才会返回;同样,在调用 ServerSocket.accept()方法时,也会一直阻塞到有客户端连接才会 返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞 I/O 的通信模型示意图如下:

如果你细细分析,一定会发现阻塞 I/O 存在一些缺点。根据阻塞 I/O 通信模型,我总结了它的两点缺点:

1.当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些 CPU 时间

  1. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。在这种情况下非阻塞式 I/O 就有了它的应 用前景。

Java NIO 是在 jdk1.4 开始使用的,它既可以说成“新 I/O”,也可以说成非阻塞式 I/O。下面是 Java NIO 的工作原理:

1.由一个专门的线程来处理所有的 IO 事件,并负责分发。

2.事件驱动机制:事件到的时候触发,而不是同步的去监视事件。

3.线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。

¶netty-与-nio

¶netty-支持的功能与特性

1.框架设计优雅,底层模型随意切换适应不同的网络协议要求。

2.提供很多标准的协议、安全、编码解码的支持。

3.解决了很多 NIO 不易用的问题。

4.社区更为活跃,在很多开源框架中使用,如 Dubbo、RocketMQ、Spark 等

1.底层核心有:Zero-Copy-Capable Buffer,非常易用的灵拷贝 Buffer(这个内容很有意思,稍后专门来说);统一的 API;标准可扩展的时间模型

2.传输方面的支持有:管道通信(具体不知道干啥的,还请老司机指教);Http 隧道;TCP 与 UDP

3.协议方面的支持有:基于原始文本和二进制的协议;解压缩;大文件传输;流媒体传输;protobuf 编解码;安全认 证;http 和 websocket

¶netty-采用-nio-而非-aio-的理由

1.Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 的底层实现仍使用 EPOLL,没有很好实现 AIO,因此在性 能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化

2.Netty 整体架构是 reactor 模型, 而 AIO 是 proactor 模型, 混合在一起会非常混乱,把 AIO 也改造成 reactor 模型看起 来是把 epoll 绕个弯又绕回来

3.AIO还有个缺点是接收数据需要预先分配缓存, 而不是NIO那种需要接收时才需要分配缓存, 所以对连接数量非常大 但流量小的情况, 内存浪费很多

4.Linux 上 AIO 不够成熟,处理回调结果速度跟不到处理需求,比如外卖员太少,顾客太多,供不应求,造成处理速度 有瓶颈(待验证)

基于BIO实现tomcat 基于Netty实现tomcat https://github.com/moremind/simple-tomcat

0 人点赞