Java NIO是一个用来替代标准Java IO API的新型数据传递方式,像现在分布式架构中会经常存在他的身影。其比传统的IO更加高效,非阻塞,异步,双向
NIO主体结构
Java NIO的主要构成核心就是Buffer、Channel和Selector这三个
对于Channel我想要提醒的是,Channel中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入
使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件
Channel
所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流
Channel的实现
- FileChannel:从文件中读写数据
- DatagramChannel:通过UDP读写网络中的数据
- SocketChannel:通过TCP读写网络中的数据
- ServerSocketChannel:监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel
Scatter/Gather
- 分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中
- 聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel
通过这样的方式可以方便数据的读取,当你想要获取整个数据的一部分的时候,通过这种方式可以很快的获取数据
代码语言:javascript复制ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写
transferFrom、transferTo
实现两个Channel之间相互连接,数据传递
代码语言:javascript复制public static void trainforNio() {
RandomAccessFile fromFile=null;
RandomAccessFile toFile=null;
try {
fromFile = new RandomAccessFile("src/nio.txt", "rw");
// channel获取数据
FileChannel fromChannel = fromFile.getChannel();
toFile = new RandomAccessFile("src/toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
System.out.println(toChannel.size());
//position处开始向目标文件写入数据,这里是toChannel
long position = toChannel.size();
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
System.out.println(toChannel.size());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (fromFile != null) {
fromFile.close();
}
if (toFile != null) {
toFile.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
transferFrom、transferTo作用是一样的,只是一个是tochannal调用,一个是fromchannnal调用
在实际的运用中可能存在源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数
在SoketChannel的实现中,SocketChannel只会传输此刻准备好的数据(可能不足count字节)。因此,SocketChannel可能不会将请求的所有数据(count个字节)全部传输到FileChannel中
看官一定要仔细看我栗子中的注释
Buffer
Buffer是一个缓存区,其会将Channel中的数据存储起来
Buffer的实现
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer
capacity,position,limit
在讲解该主题之前,首先要明白读模式和写模式,无论是Channel还是Buffer都存在这两种模式,要理解这两种模式,第一步要明确主题是哪一个,是Channel还是Buffer。举个栗子,主角是Channel,读模式的含义就是从Buffer中获取数据,写模式就是将数据写入Buffer,对于Buffer则是相反。搞清楚这一点,理解下面的就要相对清楚一点
- capacity:作为一个内存块,其就代表了当前Buffer能最多暂存多少数据量,存储的数据类型则是根据上面的Buffer对象类型,一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据
- position:代表当前数据读或写处于那个位置。读模式:被重置从0开始,最大值可能为capacity-1或者limit-1,写模式:被重置从0开始,最大值为limit-1
- limit:最多能往Buffer里写多少数据,limit大小跟数据量大小和capacity有关,读模式:数据量>capacity时,limit=capacity,数据量=capacity时,limit=capacity,数据量<capacity时,limit<capacity,写模式:limit<=capacity
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class Method {
public static void nio() {
RandomAccessFile aFile = null;
try {
aFile = new RandomAccessFile("src/nio.txt", "rw");
// channel获取数据
FileChannel fileChannel = aFile.getChannel();
// 初始化Buffer,设定Buffer每次可以存储数据量
// 创建的Buffer是1024byte的,如果实际数据本身就小于1024,那么limit就是实际数据大小
ByteBuffer buf = ByteBuffer.allocate(1024);
// channel中的数据写入Buffer
int bytesRead = fileChannel.read(buf);
System.out.println(bytesRead);
while (bytesRead != -1) {
// Buffer切换为读取模式
buf.flip();
// 读取数据
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
// 清空Buffer区
buf.compact();
// 继续将数据写入缓存区
bytesRead = fileChannel.read(buf);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (aFile != null) {
aFile.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Method.nio();
Buffer读写数据步骤
- 写入数据到Buffer(fileChannel.read(buf))
- 调用flip()方法(buf.flip())
- 从Buffer中读取数据(buf.get())
- 调用clear()方法或者compact()方法(buf.compact())
Buffer方法
flip():将Buffer读模式切换到写模式,并且将position制为0
clear():清空整个缓冲区
compact():只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面
allocate(1024):初始化Buffer,设定的值就决定capacity值的大小
rewind():将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)
mark()与reset():通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position
equals():当满足下面三个条件时,两个Buffer才是相等
- 有相同的类型(byte、char、int等)
- Buffer中剩余的byte、char等的个数相等
- Buffer中所有剩余的byte、char等都相同
只比较的是剩余的数据
compareTo():满足下列条件,则认为一个Buffer“小于”另一个Buffer
- 第一个不相等的元素小于另一个Buffer中对应的元素
- 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)
Selector
Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便
大致流程
当您调用一个选择器对象的 select( )方法时,相关的键会被更新,用来检查所有被注册到该选择器的通道。您可以获取一个键的集合,从而找到当时已经就绪的通道。通过遍历这些键,您可以选择出每个从上次您调用 select( )开始直到现在,已经就绪的通道
选择器(Selector)的特点
代码语言:javascript复制public abstract class Selector
{
// This is a partial API listing
public static Selector open( ) throws IOException
public abstract boolean isOpen( );//判断是open
public abstract void close( ) throws IOException;//选择键设置无效
public abstract SelectionProvider provider( );
}
- 选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册 的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直 到有就绪的的通道
- 不能注册已经关闭的selectableChannel
- 通过调用一个自定义的 SelectorProvider对象的 openSelector( )方法来创建一个 Selector 实例也是可行的。您可以通过调用 provider( )方法来决定由哪个 SelectorProvider 对象来创建给定的 Selector 实例
通道(Channel)的特点
代码语言:javascript复制public abstract class SelectableChannel
extends AbstractChannel
implements Channel
{
// This is a partial API listing
public abstract SelectionKey register (Selector sel, int ops)
throws ClosedChannelException;
public abstract SelectionKey register (Selector sel, int ops,
Object att)
throws ClosedChannelException;
public abstract boolean isRegistered( );
public abstract SelectionKey keyFor (Selector sel);
public abstract int validOps( );
}
- 继承SelectableChannel
- 一个channel可以注册到多个selector中
- 一个selector中同一个channel只能有一个
- 通道被注册前,要非阻塞模式
- 支持Connect、Accept、Read、Write四种可选择操作事件,但并不是所有的SelectableChannel都存在以上四类,可以通过validOps()获取可以使用的操作事件集合
- 如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来
- 任何一个通道和选择器的注册关系都被封装在一个 SelectionKey 对象中。 keyFor( )方法将 返回与该通道和指定的选择器相关的键。如果通道被注册到指定的选择器上,那么相关的键将被返 回。如果它们之间没有注册关系,那么将返回 null
选择键(SelectionKey)的特点
代码语言:javascript复制package java.nio.channels;
public abstract class SelectionKey
{
public static final int OP_READ
public static final int OP_WRITE
public static final int OP_CONNECT
public static final int OP_ACCEPT
public abstract SelectableChannel channel( );
public abstract Selector selector( );
public abstract void cancel( );
public abstract boolean isValid( );
public abstract int interestOps( );
public abstract void interestOps (int ops);
public abstract int readyOps( );
public final boolean isReadable( )
public final boolean isWritable( )
public final boolean isConnectable( )
public final boolean isAcceptable( )
public final Object attach (Object ob)
public final Object attachment( )
}
- 封装了特定的通道与特定的选择器的注册关系
- 一个 SelectionKey 对象包含两个以整数形式进行编码的byte掩码:一个用于指示那些通道/ 选择器组合体所关心的操作(instrest 集合),另一个表示通道准备好要执行的操作( ready 集合)
当终结注册关系时
当应该终结这种关系的时候,可以调用 SelectionKey对象的 cancel( )方法。可以通过调用 isValid( )方法来检查它是否仍然表示一种有效的关系。当键被取消时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( )方法时(或者一个正在进行的 select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey 将被返回
当通道关闭时
当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当 选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取 消)。一旦键被无效化,调用它的与选择相关的方法就将抛出 CancelledKeyException
interest 集合
当前的 interest 集合可以通过调用键对象的 interestOps( )方法来获取
最初,这应该是通道被注册时传进来的值。这个 interset 集合永远不会被选择器改变,但您可以通过调用 interestOps( )方法并传入一个新的byte掩码参数来改变它。 interest 集合也可以通过将通道注册到选择器上来改变(实际上使用一种迂回的方式调用 interestOps( )),就像 4.1.2 小节中描的那样。当相关的 Selector 上的 select( )操作正在进行时改变键的 interest 集合,不会影响那个正在进行的选择操作。所有更改将会在 select( )的下一个调用中体现出来
ready集合
可以通过调用键的 readyOps( )方法来获取相关的通道的已经就绪的操作。 ready 集合是 interest 集合的子集,并且表示了 interest 集合中从上次调用 select( )以来已经就绪的那些操作
SelectionKey 类定义了四个便于使用的布尔方法来为您测试这些byte值: isReadable( ), isWritable( ), isConnectable( ), 和 isAcceptable( )
SelectionKey 对象包含的 ready 集合与最近一次选择器对所注册的通道所作的检查相同。而每个单独的通道的就绪状态会同时改变
附加的对象
可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
代码语言:javascript复制selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:
代码语言:javascript复制SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
如果选择键的存续时间很长,但您附加的对象不应该存在那么长时间,请记得在完成后清理附件。否则,您附加的对象将不能被垃圾回收,您将会面临内存泄漏问题
总体上说, SelectionKey 对象是线程安全的,但知道修改 interest 集合的操作是通过 Selector 对象进行同步的是很重要的。这可能会导致 interestOps( )方法的调用会阻塞不确定长的一段时间。选择器所使用的锁策略(例如是否在整个选择过程中保持这些锁)是依赖于具体实现的。幸好,这种多元处理能力被特别地设计为可以使用单线程来管理多个通道。被多个线程使用的选择器也只会在系统特别复杂时产生问题。
选择过程
代码语言:javascript复制public abstract class Selector
{
public abstract Set keys( );
public abstract Set selectedKeys( );
public abstract int select( ) throws IOException;
public abstract int select (long timeout) throws IOException;
public abstract int selectNow( ) throws IOException;
public abstract void wakeup( );
}
已注册的键的集合
与选择器关联的已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys( )方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话 将引 java.lang.UnsupportedOperationException。
已选择的键的集合
已注册的键的集合的子集。这个集合的每个成员都是相关的通道被选择器(在前一个选择操作 中)判断为已经准备好的,并且包含于键的 interest 集合中的操作。这个集合通过 selectedKeys( )方 法返回(并有可能是空的)
不要将已选择的键的集合与 ready 集合弄混了。这是一个键的集合,每个键都关联一个已经准 备好至少一种操作的通道。每个键都有一个内嵌的 ready 集合,指示了所关联的通道已经准备好的 操作
键可以直接从这个集合中移除,但不能添加
已取消的键的集合
已注册的键的集合的子集,这个集合包含了 cancel( )方法被调用过的键(这个键已经被无效 化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问
在一个刚初始化的 Selector 对象中,这三个集合都是空的。
执行步骤
- 已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两 个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的。
- 已注册的键的集合中的键的 interest 集合将被检查。在这个步骤中的检查执行过后,对 interest 集合的改动不会影响剩余的检查过程。 a.如果通道的键还没有处于已选择的键的集合中,那么键的 ready 集合将被清空,然后表示操 作系统发现的当前通道已经准备好的操作的比特掩码将被设置。 b.否则,也就是键在已选择的键的集合中。键的 ready 集合将被表示操作系统发现的当前已经 准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作不会被清除。事实上,所有的比特位都不会被清理。由操作系统决定的 ready 集合是与之前的 ready 集合按位分离的,一旦键被放置于选择器的已选择的键的集合中,它的 ready 集合将是累积的。比特位只会被设置,不会被清理。
- 步骤 2 可能会花费很长时间,特别是所激发的线程处于休眠状态时。与该选择器相关的键可 能会同时被取消。当步骤 2 结束时,步骤 1 将重新执行,以完成任意一个在选择进行的过程中,键 已经被取消的通道的注销。
- select 操作返回的值是 ready 集合在步骤 2 中被修改的键的数量,而不是已选择的键的集合中 的通道的总数。返回值不是已准备好的通道的总数,而是从上一个 select( )调用之后进入就绪状态 的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前 一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。这些通道可能仍然在已选择的 键的集合中,但不会被计入返回值中。返回值可能是 0。
为什么延迟注销
使用内部的已取消的键的集合来延迟注销,是一种防止线程在取消键时阻塞,并防止与正在进 行的选择操作冲突的优化。注销通道是一个潜在的代价很高的操作,这可能需要重新分配资源(请 记住,键是与通道相关的,并且可能与它们相关的通道对象之间有复杂的交互)。
三种select()方法
仅仅在它们在所注册的通道当前都没有就绪时,是否阻塞的方面有所不同。
- select():在没有通道就绪时将无限阻塞。一旦至少有一个已注册的通道就绪,选择器的选择键 就会被更新,并且每个就绪的通道的 ready 集合也将被更新。返回值将会是已经确定就绪的通道的 数目。正常情况下, 这些方法将返回一个零的值,因为直到一个通道就绪前它都会阻塞。
- select(long timeout):如果在您提供的超时时间(以毫秒计算)内没有通道就绪时,它将返回 0。如果一个或者多个通道在时间限制终止前就绪,键的状态将会被更新,并且方法会在那时立即返回。将超时参数指定为 0 表示将无限期等待,那么它就在各个方面都等同于使用select()
- selectNow():执行就绪检查过程,但不阻塞。如果当前没有通道就绪,它将立即返回 0
停止选择过程
wakeUp()
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
interrupt()
如果睡眠中的线程的 interrupt( )方法被调用,它的返回状态将被设置。如果被唤醒的线程之后 将试图在通道上执行 I/O 操作,通道将立即关闭,然后线程将捕捉到一个异常。
例子
服务端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOServer {
// 通道管理器
private Selector selector;
public void initServer(int port) throws Exception {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为 非阻塞
serverChannel.configureBlocking(false);
// 将该通道对于的serverSocket绑定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 获得一耳光通道管理器
this.selector = Selector.open();
// 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
// 注册该事件后,当事件到达的时候,selector.select()会返回,
// 如果事件没有到达selector.select()会一直阻塞
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
// 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理
public void listen() throws Exception {
System.out.println("start server");
// 轮询访问selector
while (true) {
// 当注册事件到达时,方法返回,否则该方法会一直阻塞
selector.select();
// 获得selector中选中的相的迭代器,选中的相为注册的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key 以防重负处理
ite.remove();
// 客户端请求连接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以发送消息给客户端
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
// 在客户端 连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限
channel.register(this.selector, SelectionKey.OP_READ);
// 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
// 处理 读取客户端发来的信息事件
private void read(SelectionKey key) throws Exception {
// 服务器可读消息,得到事件发生的socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 穿件读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("server receive from client: " msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
public static void main(String[] args) throws Throwable {
NIOServer server = new NIOServer();
server.initServer(8989);
server.listen();
}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOClient {
// 通道管理器
private Selector selector;
/**
* * // 获得一个Socket通道,并对该通道做一些初始化的工作 * @param ip 连接的服务器的ip // * @param port
* 连接的服务器的端口号 * @throws IOException
*/
public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道
SocketChannel channel = SocketChannel.open(); // 设置通道为非阻塞
channel.configureBlocking(false); // 获得一个通道管理器
this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
// 用channel.finishConnect();才能完成连接
channel.connect(new InetSocketAddress(ip, port));
// 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* * // 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 * @throws // IOException
* @throws Exception
*/
@SuppressWarnings("unchecked")
public void listen() throws Exception { // 轮询访问selector
while (true) {
// 选择一组可以进行I/O操作的事件,放在selector中,客户端的该方法不会阻塞,
// 这里和服务端的方法不一样,查看api注释可以知道,当至少一个通道被选中时,
// selector的wakeup方法被调用,方法返回,而对于客户端来说,通道一直是被选中的
selector.select(); // 获得selector中选中的项的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next(); // 删除已选的key,以防重复处理
ite.remove(); // 连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel(); // 如果正在连接,则完成连接
if (channel.isConnectionPending()) {
channel.finishConnect();
} // 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
// 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void read(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
// 穿件读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client receive msg from server:" msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
/**
* * // 启动客户端测试 * @throws IOException
* @throws Exception
*/
public static void main(String[] args) throws Exception {
NIOClient client = new NIOClient();
client.initClient("localhost", 8989);
client.listen();
}
}