学习netty之nio

2022-10-25 16:53:27 浏览数 (3)

文章目录
  • 几个概念
  • BIO(Block l0)与NIO(Non-Block I0)对比
    • 面向流与面向缓冲
    • 阻塞与非阻塞I0
    • 选择器(Selector)
    • NIO和I0如何影响应用程序的设计
      • API调用
      • 数据处理
      • 用来处理数据的线程数
  • Java NIO ( Non-Block I0 )
  • 缓冲区Buffer
    • Buffer操作基本API
    • 深入剖析Buffer
  • 缓中区的分配
    • 缓冲区分片
    • 只读缓冲区
    • 直接缓冲区
    • 内存映射文件l/O
  • 通道Channel
  • 反应堆Reactor
    • Java NIO原理及通信模型
  • 选择器Selector
  • Java AIO ( Asynchronous IO)
    • AIO原理
    • 代码示例

几个概念

  1. 阻塞(Block)和非阻塞(Non-Block):

阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候阻塞: 往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。

非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。

  1. 同步(Synchroni zat ion)和异步(Asynchronous)的方式:

同步和异步都是基于应用程序和操作系统处理I0事件所采用的方式。比如同步:是应用程序要直接参与I0读写的操作。异步:所有的I0读写交给操作系统去处理,应用程序只需要等待通知。

同步方式在处理I0事件的时候,必须阻塞在某个方法上面等待我们的I0事件完成(阻塞I0事件或者通过轮询I0事件的方式),对于异步来说,所有的I0读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的I0操作,当操作完成I0后,会给我们的应用程序一个通知。

同步:1)阻塞到I0事件,阻塞到read或则write。这个时候我们就完全不能做自己的事情。让读写方法加入到线程里面,然后阻塞线程来实现,对线程的性能开销比较大。

BIO(Block l0)与NIO(Non-Block I0)对比

下表总结了Java NI0和I0之间的主要差别异。

I0模型

I0

NIO

方式

从硬盘到内存

从内存到硬盘

通信

面向流(乡村公路)

面向缓冲(高速公路,多路复用技术)

处理

阻塞I0(多线程)

非阻塞I0(反应堆Reactor)

触发

选择器(轮询机制)

面向流与面向缓冲

Java NI0和I0之间第一个最大的区别是,I0 是面向流的,NI0是面向缓冲区的。Java I0面向流意味着每次从流中读一个或多个字节,真至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。Java NI0的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞I0

Java I0的各种流是阻塞的。这意味着,当一个线程调用read()或write()时, 该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。JavaNI0的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写 也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞I0的空闲时间用于在其它通道,上执行I0操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel) 。

选择器(Selector)

Java NI0的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

NIO和I0如何影响应用程序的设计

无论您选择I0或NI0工具箱,可能会影响您应用程序设计的以下几个方面:

1.对NIO或I0类的API调用。

2.数据处理。

3.用来处理数据的线程数。

API调用

当然,使用NI0的API调用时看起来与使用I0时有所不同,但这并不意外,因为并不是仅从一个InputStream逐字节读取,而是数据必须先读入缓冲区再处理。

数据处理

使用纯粹的NI0设计相较I0设计,数据处理也受到影响。

在I0设计中,我们从InnutStroam成Reader诼字节读取数据。假设你正在处理一基于行的文本数据流,例如:

有如下一段文本:

代码语言:javascript复制
Name: Tom
Age:18
Email: tom@qq.com
Phone: 13888888888

该文本行的流可以这样处理:

代码语言:javascript复制
FileInputStream input = new FileInputStream("f://info.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String nameLine = reader.readLine();
String ageLine = reader.readLine();
String emailLine = reader.readLine();
String phoneLine = reader.readLine();
String lastLine = reader.readLine();

请注意处理状态由程序执行多久决定。换句话说,一旦reader. readLine()方法返回,你就知道肯定文本行就已读完,readline() 阻塞直到整行读完,这就是原因。你也知道此行包含名称;同样,第二个readline()调用返回的时候,你知道这行包含年龄等。正 如你可以看到,该处理程序仅在有新数据读入时运行,并知道每步的数据是什么。一旦正在运行的线程已处理过读入的某些数据,该线程不会再回退数据(大多如此)。下图也说明了这条原则:

(JavaIO:从一个阻塞的流中读数据)而一个NI0的实现会有所不同,下面是一个简单的例子:

代码语言:javascript复制
ByteBuffer buffer = ByteBuffer.allocate(48);

int bytesRead = inChannel. read(buffer);

注意第二行,从通道读取字节到ByteBuffer。当这个方法调用返回时,你不知道你所需的所有数据是否在缓冲区内。你所知道的是,该缓冲区包含一些字节,这使得处理有点困难。假设第一次read(buffer)调用后, 读入缓冲区的数据只有半行,例如,“Name:An”, 你能处理数据吗?显然不能,需要等待,直到整行数据读入缓存,在此之前,对数据的任何处理毫无意义。

所以,你怎么知道是否该缓冲区包含足够的数据可以处理呢?好了,你不知道。发现的方法只能查看缓冲区中的数据。其结果是,在你知道所有数据都在缓冲区里之前,你必须检查几次缓冲区的数据。这不仅效率低下,而且可以使程序设计方案杂乱不堪。例如:

代码语言:javascript复制
ByteBuffer buffer = ByteBuffer .allocate(48);
int bytesRead = inChannel. read(buffer) ;

while(! bufferFull(bytesRead) ) {
	bytesRead = inChannel. read(buffer);
}

bufferFull()方法必须跟踪有多少数据读入缓冲区,并返回真或假,这取决于缓冲区是否已满。换句话说,如果缓冲区准备好被处理,那么表示缓冲区满了。

bufferFull()方法扫描缓冲区,但必须保持在bufferFull ()方法被调用之前状态相同。,如果没有,下一个读入缓冲区的数据可能无法读到正确的位置。这是不可能的,但却是需要注意的又一问题。

如果缓冲区已满,它可以被处理。如果它不满,并且在你的实际案例中有意义,你或许能处理其中的部分数据。但是许多情况下并非如此。下图展示了“缓冲区数据循环就绪”:

用来处理数据的线程数

NI0可让您只使用一个(或几个)单线程管理多个通道(网络连接或文件),但付出的代价是解析数据可能会比从一个阻塞流中读取数据更复杂。

如果需要管理同时打开的成千,上万个连接,这些连接每次只是发送少量的数据,例如聊天服务器,实现NI0的服务器可能是一个优势。同样,如果你需要维持许多打开的连接到其他计算机上,如P2P网络中,使用一个单独的线程来管理你所有出站连接,可能是一个优势。一个线程多个连接的设计方案如

Java NIO:单线程管理多个连接

如果你有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。下图说明了一个典型的IO服务器设计:

Java I0:一个典型的I0服务器设计-一个连接通过一个线程处理.

Java NIO ( Non-Block I0 )

在Java1.4之前的I/0系统中,提供的都是面向流的I/0系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的I/0速度非常慢,而在Java 1.4中推出了NI0,这是一个面向块的I/0系统,系统以块的方式处理处理,每一个操作在一步中产生或者消费一个数据库,按块处理要比按字节处理数据快的多。在NI0中有几个核心对象需要掌握:缓冲区(Buffer)、通道(Channel)、选择器(Selector)。

缓冲区Buffer

Buffer操作基本API

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在NI0库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时, 它也是写入到缓冲区中的;任何时候访问NIO中的数据,都是将它放到缓冲区中。而在面向流I/0系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。

在NIO中,所有的缓冲区类型都继承于抽象类Buffer,最常用的就是ByteBuffer,对于Java中的基本类型,基本都有一个具体Buffer类型与之相对应,它们之间的继承关系如下图所

下面是一个简单的使用IntBuffer的例子:

代码语言:javascript复制
package com.gupaoedu.nio.buffer;

import java.nio.IntBuffer;

public class TestIntBuffer {
   public static void main(String[] args) {  
        // 分配新的int缓冲区,参数为缓冲区容量  
        // 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。  
        
      //分配了8个长度的int数组
      IntBuffer buffer = IntBuffer.allocate(8);  

//    capacity //数组的长度,容量
      
        for (int i = 0; i < buffer.capacity();   i) {  
            int j = (i   1);  
            // 将给定整数写入此缓冲区的当前位置,当前位置递增  
            buffer.put(j);  
        }  
  
        // 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
       //固定缓冲区中的某些值,告诉缓冲区,
        //我要开始操作了,如果你再往缓冲区写数据的话
        //不要再覆盖我固定状态以前的数据了
        buffer.flip();
  
        // 查看在当前位置和限制位置之间是否有元素  
        while (buffer.hasRemaining()) {  
            // 读取此缓冲区当前位置的整数,然后当前位置递增  
            int j = buffer.get();  
            System.out.print(j   "  ");  
        }
   }  
}

深入剖析Buffer

在谈到缓冲区时,我们说缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果俄们使用get()方法从缓冲区获取数据或者使用put()方法把数据写入缓冲区,都会引起缓冲区状态的变化。

在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪:

position:指定了下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新,在新创建一个Buffer对象时,position 被初始化为0。

limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。.

capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。

以上三个属性值之间有一些相对大小的关系: 0 <= position <= limit <= capacity。如果我们创建一个新的容量大小为l0的ByteBuffer对象,在初始化的时候,position设置为0,1imit和capacity被设置为10,在以后使用ByteBuffer对象过程中,capacity的值不会再发生变化,而其它两个个将会随着使用而变化。四个属性值分别如图所示:

现在我们可以从通道中读取一些数据到缓冲区中,注意从通道读取数据,相当于往缓冲区中写入数据。如果读取4个自己的数据,则此时position的值为4,即下一个将要被写入的字节索引为4,而limit仍然是10,如下图所示:

下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据,在此之前,必须调用flip()方法,该方法将会完成两件事情:

1.把limit设置为当前的position值

2.把position设置为0

由于position被设置为0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节,而limit被设置为当前的position,可以保证读取的数据正好是之前写入到缓冲区中的数据,如下图所示:

现在调用get()方法从缓冲区中读取数据写入到输出通道,这会导致position的增加而limit保持不变,但position不会超过limit的值,所以在读取我们之前写入到缓冲区中的4个自己之后,position和limit的值都为4,如下图所示:

在从缓冲区中读取数据完毕后,limit 的值仍然保持在我们调用flip()方法时的值,调用clear()方法能够把所有的状态变化设置为初始化时的值,如下图所示:

最后我们用一段代码来验证这个过程,如下所示:

代码语言:javascript复制
package com.gupaoedu.nio.buffer;

import java.io.FileInputStream;
import java.nio.*;  
import java.nio.channels.*;  
  
public class BufferProgram {  
    public static void main(String args[]) throws Exception {  
       
       //这用用的是文件IO处理
        FileInputStream fin = new FileInputStream("E:/GP_WORKSPACE/test.txt");
        //创建文件的操作管道
        FileChannel fc = fin.getChannel();
        
        //分配一个10个大小缓冲区,说白了就是分配一个10个大小的byte数组
        ByteBuffer buffer = ByteBuffer.allocate(10);  
        output("初始化", buffer);
        
        //先读一下
        fc.read(buffer);  
        output("调用read()", buffer);  
  
        //准备操作之前,先锁定操作范围
        buffer.flip();  
        output("调用flip()", buffer);  
  
        //判断有没有可读数据
        while (buffer.remaining() > 0) {  
            byte b = buffer.get();  
            // System.out.print(((char)b));  
        }  
        output("调用get()", buffer);  
  
        //可以理解为解锁
        buffer.clear();  
        output("调用clear()", buffer);  
  
        //最后把管道关闭
        fin.close();  
    }  
  
    
    //把这个缓冲里面实时状态给答应出来
    public static void output(String step, Buffer buffer) {  
        System.out.println(step   " : "); 
        //容量,数组大小
        System.out.print("capacity: "   buffer.capacity()   ", ");
        //当前操作数据所在的位置,也可以叫做游标
        System.out.print("position: "   buffer.position()   ", ");
        //锁定值,flip,数据操作范围索引只能在position - limit 之间
        System.out.println("limit: "   buffer.limit());  
        System.out.println();  
    }  
}  

缓中区的分配

在前面的几个例子中,我们已经看过了,在创建一个缓冲区对象时,会调用静态方法allocate()来指定缓冲区的容量,其实调用allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。或者我们也可以直接将一个现有的数组,包装为缓冲区对象,如下示例代码所示:

代码语言:javascript复制
import java.nio.ByteBuffer;

public class BufferWrap {  
    
    public void myMethod() {  
        // 分配指定大小的缓冲区  
        ByteBuffer buffer1 = ByteBuffer.allocate(10);  
          
        // 包装一个现有的数组  
        byte array[] = new byte[10];  
        ByteBuffer buffer2 = ByteBuffer.wrap( array );
    } 
}

缓冲区分片

在NIO中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层.数组层面上是数据共享的,也就是说,子缓冲区相当于是现有缓冲区的一个视图窗口。调用slice()方法可以创建一个子缓冲区,让我们通过例子来看一下:

代码语言:javascript复制
package com.gupaoedu.nio.buffer;

import java.nio.ByteBuffer;

public class BufferSlice {  
    static public void main( String args[] ) throws Exception {  
        ByteBuffer buffer = ByteBuffer.allocate( 10 );  
          
        // 缓冲区中的数据0-9  
        for (int i=0; i<buffer.capacity();   i) {  
            buffer.put( (byte)i );  
        }  
          
        // 创建子缓冲区  
        buffer.position( 3 );  
        buffer.limit( 7 );  
        ByteBuffer slice = buffer.slice();  
          
        // 改变子缓冲区的内容  
        for (int i=0; i<slice.capacity();   i) {  
            byte b = slice.get( i );  
            b *= 10;  
            slice.put( i, b );  
        }  
          
        buffer.position( 0 );  
        buffer.limit( buffer.capacity() );  
          
        while (buffer.remaining()>0) {  
            System.out.println( buffer.get() );  
        }  
    }  
}  

在该示例中,分配了一个容量大小为10的缓冲区,并在其中放入了数据0-9,而在该缓冲区基础之上又创建了一个子缓冲区,并改变子缓冲区中的内容,从最后输出的结果来看,只有子缓冲区“可见的”那部分数据发生了变化,并且说明子缓冲区与原缓冲区是数据共享的,输出结果如下所示:

只读缓冲区

只读缓冲区非常简单,可以读取它们,但是不能向它们写入数据。可以通过调用缓冲区的asRead0nlyBuffer()方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化:

代码语言:javascript复制
import java.nio.*;  

public class ReadOnlyBuffer {  
   static public void main( String args[] ) throws Exception {  
      ByteBuffer buffer = ByteBuffer.allocate( 10 );  
       
      // 缓冲区中的数据0-9  
      for (int i=0; i<buffer.capacity();   i) {  
         buffer.put( (byte)i );  
      }  
   
      // 创建只读缓冲区  
      ByteBuffer readonly = buffer.asReadOnlyBuffer();  
      
      // 改变原缓冲区的内容  
      for (int i=0; i<buffer.capacity();   i) {  
         byte b = buffer.get( i );  
         b *= 10;  
         buffer.put( i, b );  
      }  
       
      readonly.position(0);  
      readonly.limit(buffer.capacity());  
       
      // 只读缓冲区的内容也随之改变  
      while (readonly.remaining()>0) {  
         System.out.println( readonly.get());  
      }
   }
}

直接缓冲区

直接缓冲区是为加快I/0速度,使用一种特殊方式为其分配内存的缓冲区,JDK文档中的描述为:给定一个直接字节缓冲区,Java 虚拟机将尽最大努力直接对它执行本机I/0操作。也就是说,它会在每一次调用底层操作系统的本机I/0操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。要分配直接缓冲区,需要调用allocateDirect()方法,而不是allocate()方法,使用方式与普通缓冲区并无区别,如下面的拷贝文件示例:

代码语言:javascript复制
import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;  
  
public class DirectBuffer {  
    static public void main( String args[] ) throws Exception {  
       
       //首先我们从磁盘上读取刚才我们写出的文件内容
        String infile = "E:/GP_WORKSPACE/test.txt";  
        FileInputStream fin = new FileInputStream( infile );  
        FileChannel fcin = fin.getChannel();  
          
        //把刚刚读取的内容写入到一个新的文件中
        String outfile = String.format("E:/GP_WORKSPACE/testcopy.txt");  
        FileOutputStream fout = new FileOutputStream( outfile );      
        FileChannel fcout = fout.getChannel();  
          
        // 使用allocateDirect,而不是allocate
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);  
          
        while (true) {  
            buffer.clear();  
              
            int r = fcin.read(buffer);  
              
            if (r==-1) {  
                break;  
            }  
              
            buffer.flip();  
              
            fcout.write(buffer);  
        }
   }  
}  

内存映射文件l/O

内存映射文件I/0是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的I/0快的多。内存映射文件I/0是通过使文件中的数据出现为内存数组的内容来完成的,这其初听起来似乎不过就是将整个文件读到内存中,但是事实上并不是这样。一般来说,只有文件中实际读取或者写入的部*分才会映射到内存中。如下面的示例代码:

代码语言:javascript复制
import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;  
  
public class MappedBuffer {  
    static private final int start = 0;
    static private final int size = 1024;  
      
    static public void main( String args[] ) throws Exception {  
        RandomAccessFile raf = new RandomAccessFile( "e:\test.txt", "rw" );  
        FileChannel fc = raf.getChannel();  
        
        //把缓冲区跟文件系统进行一个映射关联
        //只要操作缓冲区里面的内容,文件内容也会跟着改变
        MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE,start, size );  
          
        mbb.put( 0, (byte)97 );  
        mbb.put( 1023, (byte)122 );  
        
        
        raf.close();  
    }  
}

通道Channel

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过Buffer对象来处理。.我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。在NI0中,提供了多种通道对象,而所有的通道对象都实现了Channel 接口。它们之间的继承关系如下图所示:

使用NI0读取数据

在前面我们说过,任何时候读取数据,都不是直接从通道读取,而是从通道读取到缓冲区。所以使用NI0读取数据可以分为下面三个步骤:

1.从FileInputStream获取Channel

2.创建Buffer

3.将数据从Channel读取到Buffer中

下面是一个简单的使用NI0从文件中读取数据的例子:

代码语言:javascript复制
import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;  
  
public class FileInputProgram {  
    static public void main( String args[] ) throws Exception {  
        FileInputStream fin = new FileInputStream("c:\test.txt");  
        
        // 获取通道  
        FileChannel fc = fin.getChannel();  
          
        // 创建缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(1024);  
          
        // 读取数据到缓冲区  
        fc.read(buffer);  
        
        buffer.flip();  
          
        while (buffer.remaining() > 0) {  
            byte b = buffer.get();  
            System.out.print(((char)b));  
        }  
          
        fin.close();
    }  
}

使用NI0写入数据

使用NI0写入数据与读取数据的过程类似,同样数据不是直接写入通道,而是写入缓冲区,可以分为下面三个步骤:

1.从FileInputStream获取Channel

2.创建Buffer

3.将数据从Channel写入到Buffer中

下面是一个简单的使用NI0向文件中写入数据的例子:

代码语言:javascript复制
import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;  
  
public class FileOutputProgram {  
    static private final byte message[] = { 83, 111, 109, 101, 32,  
        98, 121, 116, 101, 115, 46 };  
  
    static public void main( String args[] ) throws Exception {  
        FileOutputStream fout = new FileOutputStream( "e:\test.txt" );  
          
        FileChannel fc = fout.getChannel();  
          
        ByteBuffer buffer = ByteBuffer.allocate( 1024 );  
          
        for (int i=0; i<message.length;   i) {  
            buffer.put( message[i] );  
        }  
          
        buffer.flip();   
          
        fc.write( buffer );  
          
        fout.close();  
    }  
}

反应堆Reactor

假如现在你对阻塞I/0已有了一定了解,我们知道阻塞I/0在调用InputStream. read ()方法时是阻塞的,它会一直等到数据到来时(或超时)才会返回;同样,在调用ServerSocket. accept()方法时,也会一直阻塞到有客户端连接才会返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客Q户端的请求。阻塞I/0的通信模型示意图如下:

如果你细细分析,一定会发现阻塞I/0存在一些缺点。根据阻塞I/0通信模型,我总结了它的两点缺点:

  1. 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
  2. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。在这种情况下非阻塞式I/0就有了它的应用前景。

Java NIO原理及通信模型

Java NI0是在jdk1.4开始使用的,它既可以说成“新I/0”,也可以说成非阻塞式I/0。下面是Java NIO的工作原理:

  1. 由一个专门的线程来处理所有的I0事件,并负责分发。
  2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
  3. 线程通讯:线程之间通过wait, notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。

下面贴出我理解的Java NI0的工作原理图:

每个线程的处理流程大概都是读取数据,解码,计算处理,编码,发送响应。

选择器Selector

传统的Server/Client模式会基于TPR (Thread per Request) , 服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题,都采用了线程池模型,并设置线程池线程的最大数量,这又带来了新的问题,如果线程池中有200个线程,而有200个用户都在进行大文件下载,会导致第201个用户的请求无法及时处理,即便第201个用户只想请求一个几KB大小的页面。传统的Server/Client模式如下图所示:

NI0中非阻塞I/0采用了基于Reactor模式的工作方式,I/0 调用不会被阻塞,相反是注册感兴趣的特定I/0事件,如可读数据到达,新的套接字连接等等,在发生特定事件时,系统再通知我们。NI0中实现非阻塞I/0的核心对象就是Selector, Selector 就是注册各种I/0事件地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:

从图中可以看出,当有读或写等任何注册的事件发生时,可以从Selector中获得相应的SelectionKey,同时从SelectionKey 中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。

使用NI0中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:

  1. 向Selector对象注册感兴趣的事件
  2. 从Selector中获取感兴趣的事件
  3. 根据不同的事件进行相应的处理

Java AIO ( Asynchronous IO)

jdk1.7才是实现真正的异步aio 把io读写操作完全交给操作系统 学习linux epoll模式

AIO原理

服务端:AsynchronousServerSocketChannel客服端:AsynchronousSocketChannel

用户处理器:CompletionHandler接口,这个接口实现应用程序向操作系统发起I0请求,当完成后处理具体逻辑,否则做自己该做的事情,

“真正”的异步I0需要操作系统更强的支持。在I0多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异步I0模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在I0完成后通知用户线程直接使用即可。异步I0模型使用了Proactor设计模式实现了这一机制,如下图所示:

代码示例

AIOServer

代码语言:javascript复制
package com.gupaoedu.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
  

/**
 * AIO的服务端
 */
public class AIOServer {  
  
   
    private final int port;  
  
    public static void main(String args[]) {  
        int port = 8000;  
        new AIOServer(port);  
    }  
  
    //注册一个端口,用来给客户端连接
    public AIOServer(int port) {  
        this.port = port;  
        listen();  
    }  
  
    //侦听方法
    private void listen() {  
        try { 
           
           //线程缓冲池,为了体现异步
            ExecutorService executorService = Executors.newCachedThreadPool(); 
            //给线程池初始化一个线程
            AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);  
            
            //Asynchronous异步  
            //NIO   ServerSocketChannel  
            //BIO   ServerSocket   有那么一点点像
            
            //同样的,也是先把高速公路修通
            final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup); 
            
            //打开高速公路的关卡
            server.bind(new InetSocketAddress(port));  
            System.out.println("服务已启动,监听端口"   port);
            
            
            final Map<String,Integer> count = new ConcurrentHashMap<String, Integer>();
            count.put("count", 0);
            //开始等待客户端连接
            //实现一个CompletionHandler 的接口,匿名的实现类
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
               
               final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
               
               //实现IO操作完成的方法
               public void completed(AsynchronousSocketChannel result, Object attachment) {  
                  count.put("count", count.get("count")   1);
                  
                  System.out.println(count.get("count"));
                  //只要拿数据,捡现成的,我们都是懒人,IO操作都不用关心了
                   //System.out.println("IO操作成功,开始获取数据");  
                     try {  
                       buffer.clear();  
                        result.read(buffer).get();
                        buffer.flip();  
                        result.write(buffer);  
                        buffer.flip();  
                    } catch (Exception e) {  
                        System.out.println(e.toString());  
                    } finally {  
                        try {  
                            result.close();  
                            server.accept(null, this);  
                        } catch (Exception e) {  
                            System.out.println(e.toString());  
                        }  
                    }  
  
                    //System.out.println("操作完成");  
                }  
     
               //实现IO操作失败的方法
                @Override  
                public void failed(Throwable exc, Object attachment) {  
                    System.out.println("IO操作是失败: "   exc);  
                }  
            });
     
            
           try {  
               Thread.sleep(Integer.MAX_VALUE);  
           } catch (InterruptedException ex) {  
               System.out.println(ex);  
           }  
        } catch (IOException e) {  
            System.out.println(e);  
        }  
    }  
}  

AIOClient

代码语言:javascript复制
package com.gupaoedu.aio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
  
public class AIOClient {  
    private final AsynchronousSocketChannel client ;  
      
    public AIOClient() throws Exception{  
       
       //Asynchronous
       //BIO   Socket
       //NIO   SocketChannel
       //AIO   AsynchronousSocketChannel
       //先把高速公路修起来
       client = AsynchronousSocketChannel.open();  
    }  
      
    public void connect(String host,int port)throws Exception{  
       
       //开始发车,连上高速公路
       //Viod什么都不是
       //也是实现一个匿名的接口
       //这里只做写操作
        client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {  
           
           //实现IO操作完成的方法
            @Override  
            public void completed(Void result, Void attachment) {  
                try {  
                    client.write(ByteBuffer.wrap(("这是一条测试数据"   System.currentTimeMillis()).getBytes())).get();  
                   // System.out.println("已发送至服务器");  
                } catch (Exception ex) {  
                    ex.printStackTrace();  
                }  
            }  
  
            
            //实现IO操作失败的方法
            @Override  
            public void failed(Throwable exc, Void attachment) {  
                exc.printStackTrace();  
            }  
        }); 
        
        //下面这一段代码是只读数据
        final ByteBuffer bb = ByteBuffer.allocate(1024);  
        client.read(bb, null, new CompletionHandler<Integer,Object>(){  
  
           //实现IO操作完成的方法
            @Override  
            public void completed(Integer result, Object attachment) {  
                 //System.out.println("IO操作完成"   result);  
                 System.out.println("获取反馈结果"   new String(bb.array()));  
            }  
  
            
           //实现IO操作失败的方法
            @Override  
            public void failed(Throwable exc, Object attachment) {  
                    exc.printStackTrace();  
                }  
            }  
        );  

          
        try {  
            Thread.sleep(Integer.MAX_VALUE);  
        } catch (InterruptedException ex) {  
            System.out.println(ex);  
        }  
          
    }  
      
    public static void main(String args[])throws Exception{
       int count = 10;
       final CountDownLatch latch = new CountDownLatch(count);
       
       for (int i = 0; i < count; i   ) {
          latch.countDown();
          new Thread(){
             public void run(){
                try{
                   latch.await();
                   new AIOClient().connect("localhost",8000);
                }catch(Exception e){
                   
                }
             }
          }.start();
      }
       
       Thread.sleep(1000 * 60 * 10);
    }  
}  

0 人点赞