NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel

2022-04-25 14:20:43 浏览数 (1)

NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel

1、SelectableChannel

SelectableChannel是一个抽象类,它实现了Channel接口,这个类比较特殊。

SelectableChannel可以被Selector用来多路复用,不过首先需要调用selectableChannel.configureBlocking(false)调整为非阻塞模式。

可选通道的使用大致过程如下:

  • 1.新建通道。
  • 2.将通道的事件注册到选择器Selector上。
  • 3.通过SelectKey获得需要处理的通道,然后对通道进行处理。

SelectableChannel的实现类,可以看出主要分为几大块:

  • 有关UDP协议的:DatagramChannel。
  • 有关SCTP协议的:SctpChannel、SctpMultiChannel、SctpServerChannel。
  • 有关TCP协议的:ServerSocketChannel、SocketChannel。
  • 有关管道的:SinkChannel、SourceChannel这两个抽象类定义在java.nio.channels.Pipe类中。

1.1重要方法

1.1.1register()

注册通道到选择器。在ServerSocketChannle和SocketChannel上提供了register方法来实现注册,通过SelectionKey来实现选择。

此方法有两个重载:

  • SelectionKey register(Selector sel, int ops)
  • SelectionKey register(Selector sel, int ops, Object att)

参数释义:

  • 第一个参数代表要注册的Selector实例。
  • 第二个参数代表本通道感兴趣的操作,这些都定义在SelectionKey类中。
  • 第三个参数Object att是注册时的附件,也就是可以在注册的时候带点什么东西过去。
1.1.2isRegistered()

获取通道是否注册到选择器,新创建的通道没有注册,返回true表示已经注册,false表示没有注册。

1.1.3configureBlocking(boolean)

这个是设置channel的阻塞模式的,true代表block;false为non-block。一般用non-block配合Selector多路复用

1.1.4isBlocking()

检测当前通道的阻塞状态。默认直接返回Channel.blocking属性的值。

2、SocketCannel

用于Socket的TCP连接的数据读写,既可以从Channel读数据,也可以向Channle中写入数据。

2.1重要方法

2.1.1open()

获取SocketCannel通道。

有两个重载:

  • open()
  • open(SocketAddress)
2.1.2validOps()

返回一个本通道支持的操作方式,支持三种,读、写连接。

2.1.3bind(SocketAddress)

绑定一个本地的套接字地址。

2.1.4setOption(SocketOption<T>, T)

设置套接字的操作方式。

2.1.5shutdownInput()

停止当前连接的读操作,并且关闭通道。

2.1.6shutdownOutput()

停止当前连接的写操作,并且关闭通道。

2.1.7socket()

获取一个和当前通道有关的套接字。

2.1.8isConnected()

判断是否建立了连接。

2.1.9isConnectionPending()

当前通道上是否存在正在连接的操作。

2.1.10connect(SocketAddress)

建立连接。

2.1.11finishConnect()

完成连接。

2.1.12getRemoteAddress()

返回一个此通道的套接字已经连接的远程地址。

2.1.13read()

读数据。

此方法有三个重载。

  • read(ByteBuffer)
  • read(ByteBuffer[], int, int)
  • read(ByteBuffer[])
2.1.14write()

写数据。

此方法有三个重载。

  • write(ByteBuffer)
  • write(ByteBuffer[], int, int)
  • write(ByteBuffer[])
2.1.15getLocalAddress()

获取此通道的套接字绑定的地址。

2.2案例

Socket客户端代码:

代码语言:javascript复制
package xyz.xujd.testcase.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class SocketChannelTest {

	public static void main(String[] args) {
		try {
			SocketChannel sc = SocketChannel.open();
			sc.configureBlocking(true);
			System.out.println(sc.connect(new InetSocketAddress("localhost", 8090)));
			ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
			while (true) {
				int count = 0;
				while ((count = sc.read(byteBuffer)) > 0) {
					byteBuffer.flip();
					while (byteBuffer.hasRemaining()) {
						System.out.println((char) byteBuffer.get());
					}
					byteBuffer.clear();
				}
				sc.write(byteBuffer.put("return".getBytes()));
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

3、ServerSocketChannel

针对面向流的侦听套接字的可选择通道。多个并发线程可安全地使用服务器套接字通道。

通过ServerSocketChannel可以监听TCP连接,服务端监听到连接之后,会为每个请求创建一个SocketChannel。

3.1重要方法

3.1.1accept()

接受连接。

3.1.2bind()

将通道的套接字与本地地址绑定,并且配置套接字监听连接。

此方法有两个重载。

  • bind(SocketAddress)
  • bind(SocketAddress, int)
3.1.3open()

获取ServerSocketChannel通道。

3.1.4socket()

获取服务器关联的套接字。

3.1.5validOps()

返回一个操作集,表示次通道支持的操作。

3.1.6setOption(SocketOption<T>, T)

设置服务器操作集。

3.1.7getLocalAddress()

获取服务器套接字关联的地址。

3.2案例

Socket服务端代码:

代码语言:javascript复制
package xyz.xujd.testcase.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class ServerSocketChannelTest {

	private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

	public static void main(String[] args) {
		ServerSocketChannelTest tssc = new ServerSocketChannelTest();
		tssc.run();
	}

	public void run() {
		try {
			// 创建一个选择器。
			Selector selector = Selector.open();
			// 打开一个通道。
			ServerSocketChannel ssc = ServerSocketChannel.open();
			// 设置为非阻塞模式
			ssc.configureBlocking(false);
			// 在通道上绑定一个地址
			ssc.socket().bind(new InetSocketAddress(8090));
			// 将通道注册到选择器上,并设置选择器的时间是OP_ACCET
			// 一个通道可以注册到多个选择器上,但是一个通道在一个选择器上只能注册一次。
			// 不同的通道在选择器中注册的事件不一样,第二个参数是有限制的,由channel类型决定。
			ssc.register(selector, SelectionKey.OP_ACCEPT);
			// 服务器端准备就绪。
			System.out.println("------------------服务器开始等待客户端连接----------------------------------------");
			while (true) {
				// 选择器阻塞,等待事件到来,事件不来就阻塞,事件到来返回时间到来的通道个数。
				int i = selector.select();
				if (i == 0) {
					continue;
				}
				// 遍历事件。
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey selectionKey = it.next();
					// 如果是OP_ACCEPT,表示这个事件是ServerSocketChannel的事件,因为SocketChannel没有这个事件。
					if (selectionKey.isAcceptable()) {
						ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
						// 建立连接
						SocketChannel channel = server.accept();
						// 将建立的链接设置为准备读,并注册到选择器上。
						registerChannel(selector, channel, SelectionKey.OP_READ);
						// 向建立的通道写入内容。
						hello(channel);
					}
					// 如果可读时间,没有在ServerScoketChannel上注册可读,值在SokectChannel上注册了可读。
					if (selectionKey.isReadable()) {
						// 从通道中读取数据,只有通道中有数据,才会进入这个条件中。
						readFromSocket(selectionKey);
					}
					// 此时只是将当前出发的事件从list中移除,在下一次选择时,还会查看改通道是否有操作事件发生。
					it.remove();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 读取数据
	 * 
	 * @param selectionKey
	 * @throws IOException
	 */
	private void readFromSocket(SelectionKey selectionKey) throws IOException {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		buffer.clear();
		int count = 0;
		// 当有数据可读时,第一次读到的数据不应该是0,它会读取到0或-1
		while ((count = socketChannel.read(buffer)) > 0) {
			buffer.flip();
			// 将数据回写到通道中
			while (buffer.hasRemaining()) {
				socketChannel.write(buffer);
			}
			buffer.clear();
		}
		if (count < 0) {
			socketChannel.close();
		}
	}

	/**
	 * 向通道中写入数据。
	 *
	 * @param channel
	 * @throws IOException
	 */
	private void hello(SocketChannel channel) throws IOException {
		buffer.clear();
		buffer.put("Hello".getBytes());
		buffer.flip();
		channel.write(buffer);
	}

	/**
	 * 将channel注册到选择器上。
	 * 
	 * @param selector 选择器
	 * @param channel  通道
	 * @param opRead   操作
	 * @throws IOException
	 */
	private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException {
		if (channel == null) {
			return;
		}
		channel.configureBlocking(false);
		channel.register(selector, opRead);
	}

}

4、测试阻塞

利用ServerSocketChannel、SocketChannel实现NIO方式先的TCP通信的类,在非阻塞模式下ACCEPT CONNECT READ WRITE方法都不产生阻塞。

4.1ACCEPT的非阻塞验证

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.bind(new InetSocketAddress(44444));
		ssc.accept();
	}

执行程序,执行过程一闪而过,没有出现阻塞。

4.2CONNECT的非阻塞验证

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		SocketChannel sc = SocketChannel.open();
		sc.configureBlocking(false);
		sc.connect(new InetSocketAddress("127.0.0.1", 44444));
	}

执行程序,执行过程一闪而过,没有出现阻塞。

4.3READ的非阻塞验证

服务端:

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 开启非阻塞模式
		ssc.configureBlocking(false);
		// 绑定监听端口
		ssc.bind(new InetSocketAddress(44444));
		SocketChannel sc = null;
		// 确保已经建立连接
		while (sc == null) {
			sc = ssc.accept();
		}
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 创建缓存区
		ByteBuffer buf = ByteBuffer.allocate(5);
		// 读取数据
		sc.read(buf);
	}

客户端:

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		SocketChannel sc = SocketChannel.open();
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 建立连接,确定连接目标,返回是否连接成功
		boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
		// 如果没有连接成功,重复连接,直到连接建立
		while (!isConn) {
			isConn = sc.finishConnect();
		}
		// 模拟写阻塞
		while (true) {
		}
	}

首先执行服务端代码,然后执行客户端代码,当客户端代码执行过后,服务端代码就结束执行了,说明服务端代码读操作不阻塞。

4.4WRITE的非阻塞验证

客户端:

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		SocketChannel sc = SocketChannel.open();
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 建立连接,确定连接目标,返回是否连接成功
		boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
		// 如果没有连接成功,重复连接,直到连接建立
		while (!isConn) {
			isConn = sc.finishConnect();
		}
		// 写操作计数
		int conut=0;
		while (true) {
			//模拟写操作
			int i=sc.write(ByteBuffer.wrap("a".getBytes()));
			conut =i;
			System.out.println(conut);
		}
	}

服务器端:

代码语言:javascript复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 开启非阻塞模式
		ssc.configureBlocking(false);
		// 绑定监听端口
		ssc.bind(new InetSocketAddress(44444));
		SocketChannel sc = null;
		// 确保已经建立连接
		while (sc == null) {
			sc = ssc.accept();
		}
		// 开启非阻塞模式
		sc.configureBlocking(false);
		//模拟读操作阻塞
		while(true){}
	}

执行服务器端代码,执行客户端代码,可以看到客户端一直在输出数字,大概到两万三千多的时候一直不停的打印出这个数字,而服务器什么都没有打印,说明没有接收,但是客户端仍然可以一直写,证明写操作也没有阻塞。

0 人点赞