本文主要讲解NIO的简介、NIO和传统阻塞I/O有什么区别、NIO模型和传统I/O模型之间的对比、以及围绕NIO的三大组件来讲解,理论代码相结合。 很喜欢一句话:
"沉下去,再浮上来"
。 我想我们会变的不一样。
一、Java NIO 简介
在 Java 1.4 中引入了 NIO 框架(java.nio 包),提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层的高性能数据操作方式
同步非阻塞:
Java NIO
的非阻塞模式:
- 非阻塞读:一个线程从某一个通道发送请求或者读取数据时,它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,但是并不会像原生IO一样,阻塞等待着,反而是直到数据变得可以读取之前,此线程可以去继续做其他事情。
- 非阻塞写:和非阻塞读一样,一个线程发送请求写入一些数据到某通道,在等待它完成写入这段时间中,无需阻塞等待,这个线程可以同时去做其他的事情。
- 总结起来就是
NIO
是可以做到用一个线程来处理多个操作的。 - 通俗理解:
NIO
是可以做到用一个线程来处理多个操作的。假设有10000
个请求过来,根据实际情况,可以分配50
或者100
个线程来处理。不像之前的阻塞IO
那样,非得分配10000
个。
Java NIO 由以下几个核心部分组成:
- Channels (通道)
- Buffers (缓冲区)
- Selector (选择器)
虽然Java NIO除此之外,仍有很多类和组件,但是总的来说仍然是靠Channel,Buffer 和 Selector 构成了核心的API。其余更多的是为了能让这三个核心组件更方便的使用。之后的讲解也会偏向于此。三个核心部分模型图:
简单说明:
- 一个
Thread
(线程)对应一个Selector
选择器,每个选择器对应着多个Channel
通道,每个通道又都对应着一个缓冲区(Buffer) Selector
会根据不同的事件,在各个通道上切换,但是程序切换到哪个Channel
是由事件(比如:连接请求、数据到达)决定的。- 数据的读取写入是通过
Buffer
,这个和BIO
是不同的,BIO
中要么是输入流,或者是输出流,不能双向,但是NIO
的Buffer
是可以读也可以写,需要flip
方法切换Channel
是双向的,可以返回底层操作系统的情况,比如Linux
,底层的操作系统通道就是双向的
二、传统阻塞BIO存在的问题
我们先来回忆一下传统阻塞IO的经典编程模型:
代码语言:javascript复制public static void main(String[] args) throws Exception {
//1. 创建一个线程池
ExecutorService newCachedThreadPool = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//2、创建ServerSocket
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
//3.侦听要与此套接字建立的连接并接受它。 该方法阻塞,直到建立连接。
final Socket socket = serverSocket.accept();
//4、就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(() -> {
//可以和客户端通讯
handler(socket);
});
}
}
/**
* 编写一个handler方法,和客户端通讯,读取客户端发过来的信息
* @param socket
*/
public static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
int read = inputStream.read(bytes);
if (read != -1) {
//输出客户端发送的数据
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
这是一个经典的一个连接一个线程的模型,使用多线程的主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,即没有成功连接或没有数据读、写时,系统都是阻塞,在这种情况下如果是单线程的话就会一直阻塞在哪里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
不过,这个模型是存在问题的,线程池,虽然可以让线程的创建和回收成本变低。但是线程池本生是有局限的的,在并发数并不是特别高的时候(小于单机1000),使用线程池搭用这种模型还是可行的,并竟线程池是一个天然的漏斗,不用过多考虑负载、限流等问题,编程也简单。但是仍然是解决不了根本问题的,根本问题在于严重的依赖于线程。但是大家都知道哈,线程是个特别贵的资源。
为什么说线程贵呢?
主要表现在以下几个方面:
- 首先线程的创建和销毁的成本相当高,并不是我们简单的看到的
new Thread().start()
即可了,它之后是去调用本地函数private native void start0()
,然后靠操作系统来分配资源创建线程,用完还要销毁。(要用到底层的操作系统的,就没有便宜的操作啦。)像在Liunx这样的操作系统中,一个个线程本质上就是一个个进程,创建、销毁都是属于重量级的函数,所以说线程的创建和销毁成本真的蛮高啦。 - 其次线程本身占用较大内存的,在Java中,线程的线程栈所占用的内存在Java堆外,不受Java程序控制,只受系统资源限制(如若系统给资源不足,可能创建失败),默认一个线程的线程栈大小是1M。如果每个请求都新建线程,1024个线程就会占用1个G内存,那样恐怕整个JVM的内存都会被干掉一半,而且也容易会引起系统的崩溃。
- 另外线程的切换成本也是很高的。当一个cpu从一个线程切换到另一个线程时,cpu需要保存当前线程的本地数据,程序当前的指针等,然后加载下一个等待执行的线程的本地数据,程序指针等,然后执行系统调用。这种切换也被称之为上下文切换。但是如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,就会导致系统load偏高,CPU使用率偏高,易导致系统陷入崩溃状态。
所以说,使用BIO面临十万或百万请求时,是无能为力的。所以必然需要更加高效的I/O处理模型。
三、NIO和BIO的区别
BIO
是阻塞的,NIO
则是非阻塞的。BIO
以流的方式处理数据,而NIO
以块的方式处理数据,块I/O
的效率比流I/O
高很多。BIO
基于字节流和字符流进行操作,而NIO
基于Channel
(通道)和Buffer
(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector
(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。- BIO适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,编程比较简单JDK1.4以前唯一的选择。 NIO适用于连接数目多且连接比较短的架构,比如聊天服务器,弹幕系统,服务器间通讯等,编程比较复杂,JDK1.4开始支持。
四、I/O模型之间的对比
学习I/O,I/O模型是必须知道的一点。I/O模型共有五种,分别是:
- 同步阻塞IO(Blocking IO):即传统的IO模型。
- 同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。
注意这里所说的NIO并非Java的NIO(New IO)库
。 - 多路复用IO(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
- 信号驱动IO:信号驱动式I/O是指进程预先告知内核,使得当某个描述符上发生某事时,内核使用信号通知相关进程。
- 异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。
不过在这只描述了前三种。
传统阻塞式I/O
模型图如下:Java中原生的IO便是这种。
特点:
当用户线程发出IO请求之后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才解除block状态。
非阻塞式I/O
模型图如下:
特点:
当用户线程发起一个read操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个error时,它就知道数据还没有准备好,于是就返回到用户进程去执行其他任务,等过一段时间后在去查看数据是否准备好。一旦内核中的数据准备好了,并且又再次收到了用户线程的请求,那么它马上就将数据拷贝到了用户线程,然后返回。所以事实上,在非阻塞IO模型中,用户线程需要不断地询问内核数据是否就绪,也就说非阻塞IO不会交出CPU,而会一直占用CPU。
注意
:对于非阻塞IO也有一个非常严重的问题,在while循环中需要不断地去询问内核数据是否就绪,这样会导致CPU占用率非常高,因此一般情况下很少使用while循环方式来读取数据。
多路复用I/O模型
Java NIO使用的即是这种模型。
多路复用IO主要用于处理多个IO连接时候的场景。在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才会真正调用实际的IO读写操作。因为在多路复用IO模型中,只需使用一个线程就可以管理多个socket,系统无需重复建立新的线程和销毁线程,也不必维护等,另一方面同时也避免了多线程之间的上下文切换导致的开销。并且只有在真正有socket读写事件进行时,才会使用IO资源,大大减少了资源占用,极大的提高了效率。
I/O多路复用比传统I/O好在哪里?
也许有小伙伴会说,我可以采用多线程 阻塞IO达到类似的效果,但是你需要考虑到多线程 阻塞IO,没有改变的是每个socket还是对应着一个线程,仍然无法解决根本问题,并且尤其是对于长连接来说,线程的资源一直不释放,如果后面陆续还有很多连接的话,就会造成系统的极大压力。
而使用多路复用IO模式,通过一个线程就可以去管理多个socket,并且只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。所以多路复用IO模式一方面减少了创建和销毁线程的操作,同时也避免了多线程之间切换的开销,并且提高了并发数。
I/O多路复用为何比非阻塞式I/O模型效率高?
因为在非阻塞IO中,不断地询问socket状态是通过用户线程去进行的,但是在多路复用IO模型中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。
注意事项:
多路复用IO模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件逐一进行响应。因此对于多路复用IO模型来说,如果一旦事件响应体很大,那么就有可能会导致后续的事件迟迟得不到处理,并且会影响到新的事件轮询。
在Java NIO中的理解是:
Selector就是一个socket选择器,它不停地查看所有与他绑定的socket是否准备完成,哪一个io准备完成,它就会处理对应的channel。
但是终究选择什么样的IO模型,还是需要根据实际问题实际分析的。
五、三大核心组件 | Buffer
基本介绍
Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
Buffer结构
Buffer是一个顶级的抽象类,底下有很多的实现类,这里是小小的一个图哈。从这张图可用看出Java
中的基本数据类型(boolean
除外),都有一个Buffer对应,不过其中ByteBuffer使用的最多。
Buffer API:
代码语言:javascript复制public abstract class Buffer {
public final int capacity() ;//返回此缓冲区的容量。
public final int position();//返回此缓冲区的位置。
public Buffer position(int newPosition) ;//设置此缓冲区的位置。 如果标记已定义且大于新位置,则将其丢弃。
public final int limit();//返回此缓冲区的限制。
public Buffer limit(int newLimit) ;//设置此缓冲区的限制。
public Buffer mark() ;//在其位置设置此缓冲区的标记。
public Buffer reset();//将此缓冲区的位置重置为先前标记的位置。
public Buffer clear() ;//清除此缓冲区。 将位置设置为零,将限制设置为容量,并丢弃标记。
public Buffer flip();//翻转这个缓冲区。 将限制设置为当前位置,然后将位置设置为零。 如果定义了标记,则将其丢弃。
public Buffer rewind();//倒带此缓冲区。 位置设置为零,标记被丢弃。
public final int remaining() ;//返回当前位置和限制之间的元素数。
public final boolean hasRemaining();//告诉当前位置和限制之间是否有任何元素。
public abstract boolean isReadOnly();//告诉这个缓冲区是否是只读的。
// -----------jdk1.6引入----------
public abstract boolean hasArray();//告诉此缓冲区是否由可访问数组支持。
public abstract Object array();//返回支持此缓冲区的数组(可选操作) 。
public abstract int arrayOffset();//返回缓冲区第一个元素在此缓冲区的后备数组中的偏移量
public abstract boolean isDirect();//判断此缓冲区是否为direct 。
}
六、三大核心组件 | Channel
基本介绍
Java NIO的通道类似流,但又有些不同:
- 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
- 通道可以异步地读写。
- 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
常用Channel的实现类
以下是Java NIO中最常用Channel的通道的实现:
FileChannel
从文件中读写数据。DatagramChannel
能通过UDP读写网络中的数据。SocketChannel
能通过TCP读写网络中的数据。ServerSocketChannel
可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
结构图:
小小的应用案例:
场景:使用一个Buffer
和FileChannel
完成文件读取、写入。
my.txt—>you.txt
代码语言:javascript复制public class NIOFileChannel {
public static void main(String[] args) throws Exception {
FileInputStream fileInputStream = new FileInputStream("my.txt");
FileChannel fileChannel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("you.txt");
FileChannel fileChannel02 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
//循环读取
while (true) {
//清空 buffer 缓冲区
byteBuffer.clear();
int read = fileChannel01.read(byteBuffer);
System.out.println("read = " read);
//表示读完
if (read == -1) {
break;
}
//将 buffer 中的数据写入到 fileChannel02--2.txt
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
}
七、三大核心组件 | Selector
基本介绍
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,才使得一个单独的线程可以管理多个channel,从而管理多个网络连接。
在这里只要知道使用Selector能够处理多个通道就足够了,其他的暂时先不去瞄了哈。
模型图:
Selector | API:
代码语言:javascript复制public abstract class Selector implements Closeable {
public static Selector open() ;//打开一个选择器。得到一个选择器对象
public abstract boolean isOpen();//告诉这个选择器是否打开
public abstract SelectorProvider provider(); //返回创建此频道的提供者。
public abstract Set<SelectionKey> keys(); //返回此选择器的键集
public abstract Set<SelectionKey> selectedKeys();//返回此选择器的选定键集。
public abstract int selectNow() throws IOExcep
tion;//不阻塞,立马返还 ,其对应的通道已准备好进行 I/O 操作。
public abstract int select(long timeout) throws IOException;//监控所有注册的通道,当其中有IO操作时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间。
public abstract Selector wakeup();//使尚未返回的第一个选择操作立即返回。如果另一个线程当前在选择操作中被阻塞,那么该调用将立即返回。
public abstract void close() throws IOException;//关闭此选择器。
}
因为要结合其他组件才能体现出Selector
,所以案例放在后文中了。
八、NIO 网络编程分析图
模型图如下:
流程分析:
服务器端启动,打开服务器的ServerSocketChannel,创建并且打开Selector,将ServerSocketChannel注册到selector上
代码语言:javascript复制ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Selector 进行监听 select 方法, 返回有事件发生的通道的个数
代码语言:javascript复制 if (selector.select(1000) == 0) //这里我写的是每秒刷新一次
客户端启动,打开客户端的SocketChannel,绑定服务端地址,向服务端发送连接请求
代码语言:javascript复制SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(inetSocketAddress)
服务端Selector监听到注册事件发生后,通过selector.selectedKeys()
方法拿到SelectionKey
,会和该 Selector 关联(集合)
//通过selector得到selectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
再通过 java.nio.channels.SelectionKey 反向获取 SocketChannel , 方法 channel()
代码语言:javascript复制SocketChannel channel = (SocketChannel) selectionKey.channel();
可以通过得到的channel
, 完成业务处理
九、快速入门案例(一)
一直写理论,不写代码,看着发愁。所以一起来看看这个简单的入门案例吧。
场景:NIO
入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
public class NioServerTest {
public static void main(String[] args) throws Exception{
//1、创建一个ServerSocketChannel对象,绑定端口并配置成非阻塞模式。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888), 1024);
//2、下面这句必需要,否则ServerSocketChannel会使用阻塞的模式,那就不是NIO了
serverSocketChannel.configureBlocking(false);
//3、把ServerSocketChannel交给Selector监听
Selector selector = Selector.open();
//4、注册进Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//5、循环,不断的从Selector中获取准备就绪的Channel,最开始的时候Selector只监听了一个ServerSocketChannel
//但是后续有客户端连接时,会把客户端对应的Channel也交给Selector对象
while (true) {
//6、一直监听,每秒刷新一次,等待客户端的连接
if (selector.select(1000) == 0){
continue;
}
//获取所有的准备就绪的Channel,SelectionKey中包含中Channel信息
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
//遍历,每个Channel都可处理
for (SelectionKey selectionKey : selectionKeySet) {
//如果Channel已经无效了,则跳过(如Channel已经关闭了)
if(!selectionKey.isValid()) {
continue;
}
//判断Channel具体的就绪事件,如果是有客户端连接,则建立连接
if (selectionKey.isAcceptable()) {
System.out.println("连接成功!!!");
//当确定有selectionKey时,说明必有socketChannel,Server接收后得到SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//把客户端的Channel交给Selector监控,之后如果有数据可以读取时,会被select出来
socketChannel.register(selector, SelectionKey.OP_READ);
}
//如果有客户端可以读取请求了,则读取请求然后返回数据
if (selectionKey.isReadable()) {
System.out.println(readFromSelectionKey(selectionKey));
}
}
//处理完成后把返回的Set清空,如果不清空下次还会再返回这些Key,导致重复处理
selectionKeySet.clear();
}
}
//从客户端读取数据的庐江
private static String readFromSelectionKey(SelectionKey selectionKey) throws Exception{
//从SelectionKey中包含选取出来的Channel的信息把Channel获取出来
SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
//读取数据到ByteBuffer中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
//如果读到-1,说明数据已经传输完成了,可以并闭
if (len < 0) {
socketChannel.close();
selectionKey.cancel();
return "";
} else if(len == 0) { //什么都没读到
return "";
}
byteBuffer.flip();
doWrite(selectionKey, "Hello Nio");
return new String(byteBuffer.array(), 0, len);
}
private static void doWrite(SelectionKey selectionKey, String responseMessage) throws Exception{
System.err.println("Output message...");
SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
ByteBuffer byteBuffer = ByteBuffer.allocate(responseMessage.getBytes().length);
byteBuffer.put(responseMessage.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
}
客户端:
代码语言:javascript复制package com.crush.nio.nio2;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Set;/** * @Author: crush * @Date: 2021-08-24 16:01 * version 1.0 */public class NioClientTest { public static void main(String[] args) throws Exception { //创建一个SocketChannel对象,配置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //创建一个选择器,并把SocketChannel交给selector对象 Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); //发起建立连接的请求,这里会立即返回,当连接建立完成后,SocketChannel就会被选取出来 socketChannel.connect(new InetSocketAddress("localhost", 8888)); //遍历,不段的从Selector中选取出已经就绪的Channel,在这个例子中,Selector只监控了一个SocketChannel while (true) { selector.select(1000); Set<SelectionKey> selectionKeySet = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeySet) { if (!selectionKey.isValid()) { continue; } //连接建立完成后的操作:直接发送请求数据 if (selectionKey.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); doWriteRequest(((SocketChannel) selectionKey.channel())); } } //如果当前已经可以读数据了,说明服务端已经响应完了,读取数据 if (selectionKey.isReadable()) { doRead(selectionKey); } } //最后同样要清除所有的Key selectionKeySet.removeAll(selectionKeySet); } } //发送请求 private static void doWriteRequest(SocketChannel socketChannel) throws Exception { System.err.println("start connect..."); //创建ByteBuffer对象,会放入数据 ByteBuffer byteBuffer = ByteBuffer.allocate("Hello Server!".getBytes().length); byteBuffer.put("Hello Server!".getBytes()); byteBuffer.flip(); //写数据 socketChannel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { System.err.println("Send request success..."); } } //读取服务端的响应 private static void doRead(SelectionKey selectionKey) throws Exception { SocketChannel socketChannel = ((SocketChannel) selectionKey.channel()); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = socketChannel.read(byteBuffer); System.out.println("Recv:" new String(byteBuffer.array(), 0, len)); }}
先启动服务器端,再启动客户端,直接可以在客户端看到输出消息。
十、快速入门案例(二)
代码语言:javascript复制/**
* @Author: crush
* @Date: 2021-08-24 16:33
* version 1.0
*/
public class GroupChatServer {
//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
//构造器
//初始化工作
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
//ServerSocketChannel
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将该 listenChannel 注册到 selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
try {
//循环处理
while (true) {
int count = selector.select();
if (count > 0) { //有事件处理
// 遍历得到 selectionKey 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
//取出 selectionkey
SelectionKey key = iterator.next();
//监听到 accept
if (key.isAcceptable()) {
SocketChannel sc = listenChannel.accept();
sc.configureBlocking(false);
//将该 sc 注册到 seletor
sc.register(selector, SelectionKey.OP_READ);
//提示
System.out.println(sc.getRemoteAddress() " 上线 ");
}
if (key.isReadable()) {//通道发送read事件,即通道是可读的状态
// 处理读(专门写方法..)
readData(key);
}
//当前的 key 删除,防止重复处理
iterator.remove();
}
} else {
System.out.println("等待....");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//发生异常处理....
}
}
//读取客户端消息
public void readData(SelectionKey key) {
SocketChannel channel = null;
try {
//得到 channel
channel = (SocketChannel) key.channel();
//创建 buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据 count 的值做处理
if (count > 0) {
//把缓存区的数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("form客户端:" msg);
//向其它的客户端转发消息(去掉自己),专门写一个方法来处理
sendInfoToOtherClients(msg, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() "离线了..");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
}
//转发消息给其它客户(通道)
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
System.out.println("服务器转发消息中...");
//遍历所有注册到 selector 上的 SocketChannel,并排除 self
for (SelectionKey key : selector.keys()) {
//通过 key 取出对应的 SocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self) {
//转型
SocketChannel dest = (SocketChannel) targetChannel;
//将 msg 存储到 buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//将 buffer 的数据写入通道
dest.write(buffer);
}
}
}
public static void main(String[] args) {
//创建服务器对象
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
客户端:
代码语言:javascript复制/**
* @Author: crush
* @Date: 2021-08-24 16:33
* version 1.0
*/
public class GroupChatClient {
//定义相关的属性
private final String HOST = "127.0.0.1";//服务器的ip
private final int PORT = 6667;//服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;
//构造器,完成初始化工作
public GroupChatClient() throws IOException {
selector = Selector.open();
//连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将 channel 注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到 username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username " is ok...");
}
//向服务器发送消息
public void sendInfo(String info) {
info = username " 说:" info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
//读取从服务器端回复的消息
public void readInfo() {
try {
int readChannels = selector.select();
if (readChannels > 0) {//有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
//得到相关的通道
SocketChannel sc = (SocketChannel) key.channel();
//得到一个 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
iterator.remove(); //删除当前的 selectionKey,防止重复操作
} else {
//System.out.println("没有可以用的通道...");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//启动我们客户端
GroupChatClient chatClient = new GroupChatClient();
//启动一个线程,每个 3 秒,读取从服务器发送数据
new Thread() {
public void run() {
while (true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
测试:
十一、自言自语
个人感受:越学习到后面,就越感觉基础的重要性。很多技术都只会去用,很多时候甚至都不懂它的设计理念也不懂为什么要那样做。
最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们一起学习,一起讨论吧。
在学习路上充满好奇心,明白思考的重要性,是支持我一直学习下去的积极推动力吧。希望你也能喜欢上编程!