NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel
1、SelectableChannel
SelectableChannel是一个抽象类,它实现了Channel接口,这个类比较特殊。
SelectableChannel可以被Selector用来多路复用,不过首先需要调用selectableChannel.configureBlocking(false)调整为非阻塞模式。
可选通道的使用大致过程如下:
- 1.新建通道。
- 2.将通道的事件注册到选择器Selector上。
- 3.通过SelectKey获得需要处理的通道,然后对通道进行处理。
SelectableChannel的实现类,可以看出主要分为几大块:
- 有关UDP协议的:DatagramChannel。
- 有关SCTP协议的:SctpChannel、SctpMultiChannel、SctpServerChannel。
- 有关TCP协议的:ServerSocketChannel、SocketChannel。
- 有关管道的:SinkChannel、SourceChannel这两个抽象类定义在java.nio.channels.Pipe类中。
1.1重要方法
1.1.1register()
注册通道到选择器。在ServerSocketChannle和SocketChannel上提供了register方法来实现注册,通过SelectionKey来实现选择。
此方法有两个重载:
- SelectionKey register(Selector sel, int ops)
- SelectionKey register(Selector sel, int ops, Object att)
参数释义:
- 第一个参数代表要注册的Selector实例。
- 第二个参数代表本通道感兴趣的操作,这些都定义在SelectionKey类中。
- 第三个参数Object att是注册时的附件,也就是可以在注册的时候带点什么东西过去。
1.1.2isRegistered()
获取通道是否注册到选择器,新创建的通道没有注册,返回true表示已经注册,false表示没有注册。
1.1.3configureBlocking(boolean)
这个是设置channel的阻塞模式的,true代表block;false为non-block。一般用non-block配合Selector多路复用
1.1.4isBlocking()
检测当前通道的阻塞状态。默认直接返回Channel.blocking属性的值。
2、SocketCannel
用于Socket的TCP连接的数据读写,既可以从Channel读数据,也可以向Channle中写入数据。
2.1重要方法
2.1.1open()
获取SocketCannel通道。
有两个重载:
- open()
- open(SocketAddress)
2.1.2validOps()
返回一个本通道支持的操作方式,支持三种,读、写连接。
2.1.3bind(SocketAddress)
绑定一个本地的套接字地址。
2.1.4setOption(SocketOption<T>, T)
设置套接字的操作方式。
2.1.5shutdownInput()
停止当前连接的读操作,并且关闭通道。
2.1.6shutdownOutput()
停止当前连接的写操作,并且关闭通道。
2.1.7socket()
获取一个和当前通道有关的套接字。
2.1.8isConnected()
判断是否建立了连接。
2.1.9isConnectionPending()
当前通道上是否存在正在连接的操作。
2.1.10connect(SocketAddress)
建立连接。
2.1.11finishConnect()
完成连接。
2.1.12getRemoteAddress()
返回一个此通道的套接字已经连接的远程地址。
2.1.13read()
读数据。
此方法有三个重载。
- read(ByteBuffer)
- read(ByteBuffer[], int, int)
- read(ByteBuffer[])
2.1.14write()
写数据。
此方法有三个重载。
- write(ByteBuffer)
- write(ByteBuffer[], int, int)
- write(ByteBuffer[])
2.1.15getLocalAddress()
获取此通道的套接字绑定的地址。
2.2案例
Socket客户端代码:
代码语言:javascript复制package xyz.xujd.testcase.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class SocketChannelTest {
public static void main(String[] args) {
try {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(true);
System.out.println(sc.connect(new InetSocketAddress("localhost", 8090)));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
int count = 0;
while ((count = sc.read(byteBuffer)) > 0) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.println((char) byteBuffer.get());
}
byteBuffer.clear();
}
sc.write(byteBuffer.put("return".getBytes()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、ServerSocketChannel
针对面向流的侦听套接字的可选择通道。多个并发线程可安全地使用服务器套接字通道。
通过ServerSocketChannel可以监听TCP连接,服务端监听到连接之后,会为每个请求创建一个SocketChannel。
3.1重要方法
3.1.1accept()
接受连接。
3.1.2bind()
将通道的套接字与本地地址绑定,并且配置套接字监听连接。
此方法有两个重载。
- bind(SocketAddress)
- bind(SocketAddress, int)
3.1.3open()
获取ServerSocketChannel通道。
3.1.4socket()
获取服务器关联的套接字。
3.1.5validOps()
返回一个操作集,表示次通道支持的操作。
3.1.6setOption(SocketOption<T>, T)
设置服务器操作集。
3.1.7getLocalAddress()
获取服务器套接字关联的地址。
3.2案例
Socket服务端代码:
代码语言:javascript复制package xyz.xujd.testcase.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class ServerSocketChannelTest {
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
public static void main(String[] args) {
ServerSocketChannelTest tssc = new ServerSocketChannelTest();
tssc.run();
}
public void run() {
try {
// 创建一个选择器。
Selector selector = Selector.open();
// 打开一个通道。
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置为非阻塞模式
ssc.configureBlocking(false);
// 在通道上绑定一个地址
ssc.socket().bind(new InetSocketAddress(8090));
// 将通道注册到选择器上,并设置选择器的时间是OP_ACCET
// 一个通道可以注册到多个选择器上,但是一个通道在一个选择器上只能注册一次。
// 不同的通道在选择器中注册的事件不一样,第二个参数是有限制的,由channel类型决定。
ssc.register(selector, SelectionKey.OP_ACCEPT);
// 服务器端准备就绪。
System.out.println("------------------服务器开始等待客户端连接----------------------------------------");
while (true) {
// 选择器阻塞,等待事件到来,事件不来就阻塞,事件到来返回时间到来的通道个数。
int i = selector.select();
if (i == 0) {
continue;
}
// 遍历事件。
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
// 如果是OP_ACCEPT,表示这个事件是ServerSocketChannel的事件,因为SocketChannel没有这个事件。
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
// 建立连接
SocketChannel channel = server.accept();
// 将建立的链接设置为准备读,并注册到选择器上。
registerChannel(selector, channel, SelectionKey.OP_READ);
// 向建立的通道写入内容。
hello(channel);
}
// 如果可读时间,没有在ServerScoketChannel上注册可读,值在SokectChannel上注册了可读。
if (selectionKey.isReadable()) {
// 从通道中读取数据,只有通道中有数据,才会进入这个条件中。
readFromSocket(selectionKey);
}
// 此时只是将当前出发的事件从list中移除,在下一次选择时,还会查看改通道是否有操作事件发生。
it.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 读取数据
*
* @param selectionKey
* @throws IOException
*/
private void readFromSocket(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
buffer.clear();
int count = 0;
// 当有数据可读时,第一次读到的数据不应该是0,它会读取到0或-1
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip();
// 将数据回写到通道中
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
buffer.clear();
}
if (count < 0) {
socketChannel.close();
}
}
/**
* 向通道中写入数据。
*
* @param channel
* @throws IOException
*/
private void hello(SocketChannel channel) throws IOException {
buffer.clear();
buffer.put("Hello".getBytes());
buffer.flip();
channel.write(buffer);
}
/**
* 将channel注册到选择器上。
*
* @param selector 选择器
* @param channel 通道
* @param opRead 操作
* @throws IOException
*/
private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException {
if (channel == null) {
return;
}
channel.configureBlocking(false);
channel.register(selector, opRead);
}
}
4、测试阻塞
利用ServerSocketChannel、SocketChannel实现NIO方式先的TCP通信的类,在非阻塞模式下ACCEPT CONNECT READ WRITE方法都不产生阻塞。
4.1ACCEPT的非阻塞验证
代码语言:javascript复制 public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(44444));
ssc.accept();
}
执行程序,执行过程一闪而过,没有出现阻塞。
4.2CONNECT的非阻塞验证
代码语言:javascript复制 public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 44444));
}
执行程序,执行过程一闪而过,没有出现阻塞。
4.3READ的非阻塞验证
服务端:
代码语言:javascript复制 public static void main(String[] args) throws IOException {
// 创建通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);
// 绑定监听端口
ssc.bind(new InetSocketAddress(44444));
SocketChannel sc = null;
// 确保已经建立连接
while (sc == null) {
sc = ssc.accept();
}
// 开启非阻塞模式
sc.configureBlocking(false);
// 创建缓存区
ByteBuffer buf = ByteBuffer.allocate(5);
// 读取数据
sc.read(buf);
}
客户端:
代码语言:javascript复制 public static void main(String[] args) throws IOException {
// 创建通道
SocketChannel sc = SocketChannel.open();
// 开启非阻塞模式
sc.configureBlocking(false);
// 建立连接,确定连接目标,返回是否连接成功
boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
// 如果没有连接成功,重复连接,直到连接建立
while (!isConn) {
isConn = sc.finishConnect();
}
// 模拟写阻塞
while (true) {
}
}
首先执行服务端代码,然后执行客户端代码,当客户端代码执行过后,服务端代码就结束执行了,说明服务端代码读操作不阻塞。
4.4WRITE的非阻塞验证
客户端:
代码语言:javascript复制 public static void main(String[] args) throws IOException {
// 创建通道
SocketChannel sc = SocketChannel.open();
// 开启非阻塞模式
sc.configureBlocking(false);
// 建立连接,确定连接目标,返回是否连接成功
boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
// 如果没有连接成功,重复连接,直到连接建立
while (!isConn) {
isConn = sc.finishConnect();
}
// 写操作计数
int conut=0;
while (true) {
//模拟写操作
int i=sc.write(ByteBuffer.wrap("a".getBytes()));
conut =i;
System.out.println(conut);
}
}
服务器端:
代码语言:javascript复制 public static void main(String[] args) throws IOException {
// 创建通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);
// 绑定监听端口
ssc.bind(new InetSocketAddress(44444));
SocketChannel sc = null;
// 确保已经建立连接
while (sc == null) {
sc = ssc.accept();
}
// 开启非阻塞模式
sc.configureBlocking(false);
//模拟读操作阻塞
while(true){}
}
执行服务器端代码,执行客户端代码,可以看到客户端一直在输出数字,大概到两万三千多的时候一直不停的打印出这个数字,而服务器什么都没有打印,说明没有接收,但是客户端仍然可以一直写,证明写操作也没有阻塞。