你对Java网络编程了解的如何?Java NIO 网络编程 | Netty前期知识(二)

2022-10-31 15:41:47 浏览数 (1)

本文主要讲解NIO的简介、NIO和传统阻塞I/O有什么区别、NIO模型和传统I/O模型之间的对比、以及围绕NIO的三大组件来讲解,理论代码相结合。 很喜欢一句话:"沉下去,再浮上来"。 我想我们会变的不一样。

一、Java NIO 简介

在 Java 1.4 中引入了 NIO 框架(java.nio 包),提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层的高性能数据操作方式

同步非阻塞

Java NIO 的非阻塞模式:

  • 非阻塞读:一个线程从某一个通道发送请求或者读取数据时,它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,但是并不会像原生IO一样,阻塞等待着,反而是直到数据变得可以读取之前,此线程可以去继续做其他事情。
  • 非阻塞写:和非阻塞读一样,一个线程发送请求写入一些数据到某通道,在等待它完成写入这段时间中,无需阻塞等待,这个线程可以同时去做其他的事情。
  • 总结起来就是NIO 是可以做到用一个线程来处理多个操作的。
  • 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。

Java NIO 由以下几个核心部分组成:

  • Channels (通道)
  • Buffers (缓冲区)
  • Selector (选择器)

虽然Java NIO除此之外,仍有很多类和组件,但是总的来说仍然是靠Channel,Buffer 和 Selector 构成了核心的API。其余更多的是为了能让这三个核心组件更方便的使用。之后的讲解也会偏向于此。三个核心部分模型图:

简单说明:

  1. 一个Thread(线程)对应一个Selector选择器,每个选择器对应着多个Channel通道,每个通道又都对应着一个缓冲区(Buffer)
  2. Selector 会根据不同的事件,在各个通道上切换,但是程序切换到哪个 Channel 是由事件(比如:连接请求、数据到达)决定的。
  3. 数据的读取写入是通过 Buffer,这个和 BIO是不同的,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIOBuffer 是可以读也可以写,需要 flip 方法切换 Channel 是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的

二、传统阻塞BIO存在的问题

我们先来回忆一下传统阻塞IO的经典编程模型:

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    //1. 创建一个线程池
    ExecutorService newCachedThreadPool = new ThreadPoolExecutor(0,
                                                                 Integer.MAX_VALUE,
                                                                 60L,
                                                                 TimeUnit.SECONDS,
                                                                 new SynchronousQueue<>(),
                                                                 Executors.defaultThreadFactory(),
                                                                 new ThreadPoolExecutor.AbortPolicy());
    //2、创建ServerSocket
    ServerSocket serverSocket = new ServerSocket(8888);
    while (true) {
        
        //3.侦听要与此套接字建立的连接并接受它。 该方法阻塞,直到建立连接。
        final Socket socket = serverSocket.accept();

        //4、就创建一个线程,与之通讯(单独写一个方法)
        newCachedThreadPool.execute(() -> {
            //可以和客户端通讯
            handler(socket);
        });
    }
}

/**
     * 编写一个handler方法,和客户端通讯,读取客户端发过来的信息
     * @param socket
     */
public static void handler(Socket socket) {
    try {
        byte[] bytes = new byte[1024];
        //通过socket获取输入流
        InputStream inputStream = socket.getInputStream();
        //循环的读取客户端发送的数据
        while (true) {
            int read = inputStream.read(bytes);
            if (read != -1) {
                //输出客户端发送的数据
                System.out.println(new String(bytes, 0, read));
            } else {
                break;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println("关闭和client的连接");
        try {
            socket.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这是一个经典的一个连接一个线程的模型,使用多线程的主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,即没有成功连接或没有数据读、写时,系统都是阻塞,在这种情况下如果是单线程的话就会一直阻塞在哪里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。

不过,这个模型是存在问题的,线程池,虽然可以让线程的创建和回收成本变低。但是线程池本生是有局限的的,在并发数并不是特别高的时候(小于单机1000),使用线程池搭用这种模型还是可行的,并竟线程池是一个天然的漏斗,不用过多考虑负载、限流等问题,编程也简单。但是仍然是解决不了根本问题的,根本问题在于严重的依赖于线程。但是大家都知道哈,线程是个特别贵的资源。

为什么说线程贵呢?

主要表现在以下几个方面:

  1. 首先线程的创建和销毁的成本相当高,并不是我们简单的看到的new Thread().start()即可了,它之后是去调用本地函数private native void start0(),然后靠操作系统来分配资源创建线程,用完还要销毁。(要用到底层的操作系统的,就没有便宜的操作啦。)像在Liunx这样的操作系统中,一个个线程本质上就是一个个进程,创建、销毁都是属于重量级的函数,所以说线程的创建和销毁成本真的蛮高啦
  2. 其次线程本身占用较大内存的,在Java中,线程的线程栈所占用的内存在Java堆外,不受Java程序控制,只受系统资源限制(如若系统给资源不足,可能创建失败),默认一个线程的线程栈大小是1M。如果每个请求都新建线程,1024个线程就会占用1个G内存,那样恐怕整个JVM的内存都会被干掉一半,而且也容易会引起系统的崩溃。
  3. 另外线程的切换成本也是很高的。当一个cpu从一个线程切换到另一个线程时,cpu需要保存当前线程的本地数据,程序当前的指针等,然后加载下一个等待执行的线程的本地数据,程序指针等,然后执行系统调用。这种切换也被称之为上下文切换。但是如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,就会导致系统load偏高,CPU使用率偏高,易导致系统陷入崩溃状态。

所以说,使用BIO面临十万或百万请求时,是无能为力的。所以必然需要更加高效的I/O处理模型。

三、NIO和BIO的区别

  1. BIO 是阻塞的,NIO 则是非阻塞的。
  2. BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
  4. BIO适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,编程比较简单JDK1.4以前唯一的选择。 NIO适用于连接数目多且连接比较短的架构,比如聊天服务器,弹幕系统,服务器间通讯等,编程比较复杂,JDK1.4开始支持。

四、I/O模型之间的对比

学习I/O,I/O模型是必须知道的一点。I/O模型共有五种,分别是:

  1. 同步阻塞IO(Blocking IO):即传统的IO模型。
  2. 同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的NIO并非Java的NIO(New IO)库
  3. 多路复用IO(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型
  4. 信号驱动IO:信号驱动式I/O是指进程预先告知内核,使得当某个描述符上发生某事时,内核使用信号通知相关进程。
  5. 异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。

不过在这只描述了前三种。

传统阻塞式I/O

模型图如下:Java中原生的IO便是这种。

特点

当用户线程发出IO请求之后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才解除block状态。

非阻塞式I/O

模型图如下:

特点

当用户线程发起一个read操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个error时,它就知道数据还没有准备好,于是就返回到用户进程去执行其他任务,等过一段时间后在去查看数据是否准备好。一旦内核中的数据准备好了,并且又再次收到了用户线程的请求,那么它马上就将数据拷贝到了用户线程,然后返回。所以事实上,在非阻塞IO模型中,用户线程需要不断地询问内核数据是否就绪,也就说非阻塞IO不会交出CPU,而会一直占用CPU。

注意:对于非阻塞IO也有一个非常严重的问题,在while循环中需要不断地去询问内核数据是否就绪,这样会导致CPU占用率非常高,因此一般情况下很少使用while循环方式来读取数据。

多路复用I/O模型

Java NIO使用的即是这种模型。

多路复用IO主要用于处理多个IO连接时候的场景在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才会真正调用实际的IO读写操作。因为在多路复用IO模型中,只需使用一个线程就可以管理多个socket,系统无需重复建立新的线程和销毁线程,也不必维护等,另一方面同时也避免了多线程之间的上下文切换导致的开销。并且只有在真正有socket读写事件进行时,才会使用IO资源,大大减少了资源占用,极大的提高了效率。

I/O多路复用比传统I/O好在哪里

也许有小伙伴会说,我可以采用多线程 阻塞IO达到类似的效果,但是你需要考虑到多线程 阻塞IO,没有改变的是每个socket还是对应着一个线程,仍然无法解决根本问题,并且尤其是对于长连接来说,线程的资源一直不释放,如果后面陆续还有很多连接的话,就会造成系统的极大压力。

而使用多路复用IO模式,通过一个线程就可以去管理多个socket,并且只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。所以多路复用IO模式一方面减少了创建和销毁线程的操作,同时也避免了多线程之间切换的开销,并且提高了并发数。

I/O多路复用为何比非阻塞式I/O模型效率高

因为在非阻塞IO中,不断地询问socket状态是通过用户线程去进行的,但是在多路复用IO模型中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。

注意事项

多路复用IO模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件逐一进行响应。因此对于多路复用IO模型来说,如果一旦事件响应体很大,那么就有可能会导致后续的事件迟迟得不到处理,并且会影响到新的事件轮询。

在Java NIO中的理解是

Selector就是一个socket选择器,它不停地查看所有与他绑定的socket是否准备完成,哪一个io准备完成,它就会处理对应的channel。


但是终究选择什么样的IO模型,还是需要根据实际问题实际分析的。

五、三大核心组件 | Buffer

基本介绍

Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

Buffer结构

Buffer是一个顶级的抽象类,底下有很多的实现类,这里是小小的一个图哈。从这张图可用看出Java 中的基本数据类型(boolean 除外),都有一个Buffer对应,不过其中ByteBuffer使用的最多。

Buffer API:

代码语言:javascript复制
public abstract class Buffer {
    public final int capacity() ;//返回此缓冲区的容量。
    public final int position();//返回此缓冲区的位置。
    public Buffer position(int newPosition) ;//设置此缓冲区的位置。 如果标记已定义且大于新位置,则将其丢弃。
    public final int limit();//返回此缓冲区的限制。
    public Buffer limit(int newLimit) ;//设置此缓冲区的限制。 
    public Buffer mark() ;//在其位置设置此缓冲区的标记。
    public Buffer reset();//将此缓冲区的位置重置为先前标记的位置。
    public Buffer clear() ;//清除此缓冲区。 将位置设置为零,将限制设置为容量,并丢弃标记。
    public Buffer flip();//翻转这个缓冲区。 将限制设置为当前位置,然后将位置设置为零。 如果定义了标记,则将其丢弃。
    public Buffer rewind();//倒带此缓冲区。 位置设置为零,标记被丢弃。
    public final int remaining() ;//返回当前位置和限制之间的元素数。
    public final boolean hasRemaining();//告诉当前位置和限制之间是否有任何元素。
    public abstract boolean isReadOnly();//告诉这个缓冲区是否是只读的。
    // -----------jdk1.6引入----------
    public abstract boolean hasArray();//告诉此缓冲区是否由可访问数组支持。
    public abstract Object array();//返回支持此缓冲区的数组(可选操作) 。
    public abstract int arrayOffset();//返回缓冲区第一个元素在此缓冲区的后备数组中的偏移量
    public abstract boolean isDirect();//判断此缓冲区是否为direct 。
}

六、三大核心组件 | Channel

基本介绍

Java NIO的通道类似流,但又有些不同:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
  • 通道可以异步地读写。
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

常用Channel的实现类

以下是Java NIO中最常用Channel的通道的实现:

  • FileChannel 从文件中读写数据。
  • DatagramChannel 能通过UDP读写网络中的数据。
  • SocketChannel 能通过TCP读写网络中的数据。
  • ServerSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

结构图:

小小的应用案例:

场景:使用一个BufferFileChannel完成文件读取、写入。

my.txt—>you.txt

代码语言:javascript复制
public class NIOFileChannel {

    public static void main(String[] args) throws Exception {

        FileInputStream fileInputStream = new FileInputStream("my.txt");
        FileChannel fileChannel01 = fileInputStream.getChannel();
        
        FileOutputStream fileOutputStream = new FileOutputStream("you.txt");
        FileChannel fileChannel02 = fileOutputStream.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        //循环读取
        while (true) {
            //清空 buffer 缓冲区
            byteBuffer.clear(); 
            int read = fileChannel01.read(byteBuffer);
            System.out.println("read = "   read);
            //表示读完
            if (read == -1) { 
                break;
            }
            //将 buffer 中的数据写入到 fileChannel02--2.txt
            byteBuffer.flip();
            fileChannel02.write(byteBuffer);
        }
        //关闭相关的流
        fileInputStream.close();
        fileOutputStream.close();
    }
}

七、三大核心组件 | Selector

基本介绍

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,才使得一个单独的线程可以管理多个channel,从而管理多个网络连接。

在这里只要知道使用Selector能够处理多个通道就足够了,其他的暂时先不去瞄了哈。

模型图:

Selector | API:

代码语言:javascript复制
public abstract class Selector implements Closeable {
    public static Selector open() ;//打开一个选择器。得到一个选择器对象
    public abstract boolean isOpen();//告诉这个选择器是否打开
    public abstract SelectorProvider provider(); //返回创建此频道的提供者。
    public abstract Set<SelectionKey> keys(); //返回此选择器的键集
    public abstract Set<SelectionKey> selectedKeys();//返回此选择器的选定键集。
    public abstract int selectNow() throws IOExcep
        tion;//不阻塞,立马返还 ,其对应的通道已准备好进行 I/O 操作。
    public abstract int select(long timeout) throws IOException;//监控所有注册的通道,当其中有IO操作时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间。
    public abstract Selector wakeup();//使尚未返回的第一个选择操作立即返回。如果另一个线程当前在选择操作中被阻塞,那么该调用将立即返回。 
    public abstract void close() throws IOException;//关闭此选择器。
}

因为要结合其他组件才能体现出Selector,所以案例放在后文中了。


八、NIO 网络编程分析图

模型图如下:

流程分析:

服务器端启动,打开服务器的ServerSocketChannel,创建并且打开Selector,将ServerSocketChannel注册到selector上

代码语言:javascript复制
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

Selector 进行监听 select 方法, 返回有事件发生的通道的个数

代码语言:javascript复制
  if (selector.select(1000) == 0) //这里我写的是每秒刷新一次

客户端启动,打开客户端的SocketChannel,绑定服务端地址,向服务端发送连接请求

代码语言:javascript复制
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(inetSocketAddress)

服务端Selector监听到注册事件发生后,通过selector.selectedKeys()方法拿到SelectionKey,会和该 Selector 关联(集合)

代码语言:javascript复制
 //通过selector得到selectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();

再通过 java.nio.channels.SelectionKey 反向获取 SocketChannel , 方法 channel()

代码语言:javascript复制
SocketChannel channel = (SocketChannel) selectionKey.channel();

可以通过得到的channel , 完成业务处理

九、快速入门案例(一)

一直写理论,不写代码,看着发愁。所以一起来看看这个简单的入门案例吧。

场景NIO 入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)

代码语言:javascript复制
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

public class NioServerTest {
    public static void main(String[] args) throws Exception{
        //1、创建一个ServerSocketChannel对象,绑定端口并配置成非阻塞模式。
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888), 1024);

        //2、下面这句必需要,否则ServerSocketChannel会使用阻塞的模式,那就不是NIO了
        serverSocketChannel.configureBlocking(false);

        //3、把ServerSocketChannel交给Selector监听
        Selector selector = Selector.open();

        //4、注册进Selector
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //5、循环,不断的从Selector中获取准备就绪的Channel,最开始的时候Selector只监听了一个ServerSocketChannel
        //但是后续有客户端连接时,会把客户端对应的Channel也交给Selector对象
        while (true) {
            
            //6、一直监听,每秒刷新一次,等待客户端的连接
            if (selector.select(1000) == 0){
                continue;
            }
            
            //获取所有的准备就绪的Channel,SelectionKey中包含中Channel信息
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            
            //遍历,每个Channel都可处理
            for (SelectionKey selectionKey : selectionKeySet) {
                
                //如果Channel已经无效了,则跳过(如Channel已经关闭了)
                if(!selectionKey.isValid()) {
                    continue;
                }
                
                //判断Channel具体的就绪事件,如果是有客户端连接,则建立连接
                if (selectionKey.isAcceptable()) {
                    System.out.println("连接成功!!!");
                    
                    //当确定有selectionKey时,说明必有socketChannel,Server接收后得到SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    //把客户端的Channel交给Selector监控,之后如果有数据可以读取时,会被select出来
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                
                //如果有客户端可以读取请求了,则读取请求然后返回数据
                if (selectionKey.isReadable()) {
                    System.out.println(readFromSelectionKey(selectionKey));
                }
            }
            //处理完成后把返回的Set清空,如果不清空下次还会再返回这些Key,导致重复处理
            selectionKeySet.clear();
        }
    }

    //从客户端读取数据的庐江
    private static String readFromSelectionKey(SelectionKey selectionKey) throws Exception{

        //从SelectionKey中包含选取出来的Channel的信息把Channel获取出来
        SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
        //读取数据到ByteBuffer中
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int len = socketChannel.read(byteBuffer);


        //如果读到-1,说明数据已经传输完成了,可以并闭
        if (len < 0) {
            socketChannel.close();
            selectionKey.cancel();
            return "";
        } else if(len == 0) { //什么都没读到
            return "";
        }
        byteBuffer.flip();
        doWrite(selectionKey, "Hello Nio");
        return new String(byteBuffer.array(), 0, len);
    }

    private static void doWrite(SelectionKey selectionKey, String responseMessage) throws Exception{
        System.err.println("Output message...");
        SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
        ByteBuffer byteBuffer = ByteBuffer.allocate(responseMessage.getBytes().length);
        byteBuffer.put(responseMessage.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    }
}

客户端:

代码语言:javascript复制
package com.crush.nio.nio2;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Set;/** * @Author: crush * @Date: 2021-08-24 16:01 * version 1.0 */public class NioClientTest {    public static void main(String[] args) throws Exception {        //创建一个SocketChannel对象,配置成非阻塞模式        SocketChannel socketChannel = SocketChannel.open();        socketChannel.configureBlocking(false);        //创建一个选择器,并把SocketChannel交给selector对象        Selector selector = Selector.open();        socketChannel.register(selector, SelectionKey.OP_CONNECT);        //发起建立连接的请求,这里会立即返回,当连接建立完成后,SocketChannel就会被选取出来        socketChannel.connect(new InetSocketAddress("localhost", 8888));        //遍历,不段的从Selector中选取出已经就绪的Channel,在这个例子中,Selector只监控了一个SocketChannel        while (true) {            selector.select(1000);            Set<SelectionKey> selectionKeySet = selector.selectedKeys();            for (SelectionKey selectionKey : selectionKeySet) {                if (!selectionKey.isValid()) {                    continue;                }                //连接建立完成后的操作:直接发送请求数据                if (selectionKey.isConnectable()) {                    if (socketChannel.finishConnect()) {                        socketChannel.register(selector, SelectionKey.OP_READ);                        doWriteRequest(((SocketChannel) selectionKey.channel()));                    }                }                //如果当前已经可以读数据了,说明服务端已经响应完了,读取数据                if (selectionKey.isReadable()) {                    doRead(selectionKey);                }            }            //最后同样要清除所有的Key            selectionKeySet.removeAll(selectionKeySet);        }    }    //发送请求    private static void doWriteRequest(SocketChannel socketChannel) throws Exception {        System.err.println("start connect...");        //创建ByteBuffer对象,会放入数据        ByteBuffer byteBuffer = ByteBuffer.allocate("Hello Server!".getBytes().length);        byteBuffer.put("Hello Server!".getBytes());        byteBuffer.flip();        //写数据        socketChannel.write(byteBuffer);        if (!byteBuffer.hasRemaining()) {            System.err.println("Send request success...");        }    }    //读取服务端的响应    private static void doRead(SelectionKey selectionKey) throws Exception {        SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);        int len = socketChannel.read(byteBuffer);        System.out.println("Recv:"   new String(byteBuffer.array(), 0, len));    }}

先启动服务器端,再启动客户端,直接可以在客户端看到输出消息。

十、快速入门案例(二)

代码语言:javascript复制
/**
 * @Author: crush
 * @Date: 2021-08-24 16:33
 * version 1.0
 */
public class GroupChatServer {

    //定义属性
    private Selector selector;
    private ServerSocketChannel listenChannel;

    private static final int PORT = 6667;

    //构造器
    //初始化工作
    public GroupChatServer() {
        try {
            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            //将该 listenChannel 注册到 selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        try {
            //循环处理
            while (true) {
                int count = selector.select();
                if (count > 0) { //有事件处理
                    // 遍历得到 selectionKey 集合
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        //取出 selectionkey
                        SelectionKey key = iterator.next();
                        //监听到 accept
                        if (key.isAcceptable()) {
                            SocketChannel sc = listenChannel.accept();
                            sc.configureBlocking(false);
                            //将该 sc 注册到 seletor
                            sc.register(selector, SelectionKey.OP_READ);
                            //提示
                            System.out.println(sc.getRemoteAddress()   " 上线 ");
                        }
                        if (key.isReadable()) {//通道发送read事件,即通道是可读的状态
                            // 处理读(专门写方法..)
                            readData(key);
                        }
                        //当前的 key 删除,防止重复处理
                        iterator.remove();
                    }
                } else {
                    System.out.println("等待....");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //发生异常处理....
        }
    }

    //读取客户端消息
    public void readData(SelectionKey key) {
        SocketChannel channel = null;
        try {
            //得到 channel
            channel = (SocketChannel) key.channel();
            //创建 buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            //根据 count 的值做处理
            if (count > 0) {
                //把缓存区的数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("form客户端:"   msg);
                //向其它的客户端转发消息(去掉自己),专门写一个方法来处理
                sendInfoToOtherClients(msg, channel);
            }
        } catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress()   "离线了..");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
    }

    //转发消息给其它客户(通道)
    private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {

        System.out.println("服务器转发消息中...");
        //遍历所有注册到 selector 上的 SocketChannel,并排除 self
        for (SelectionKey key : selector.keys()) {
            //通过 key 取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            //排除自己
            if (targetChannel instanceof SocketChannel && targetChannel != self) {
                //转型
                SocketChannel dest = (SocketChannel) targetChannel;
                //将 msg 存储到 buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //将 buffer 的数据写入通道
                dest.write(buffer);
            }
        }
    }

    public static void main(String[] args) {
        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}

客户端:

代码语言:javascript复制
/**
 * @Author: crush
 * @Date: 2021-08-24 16:33
 * version 1.0
 */
public class GroupChatClient {

    //定义相关的属性
    private final String HOST = "127.0.0.1";//服务器的ip
    private final int PORT = 6667;//服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    //构造器,完成初始化工作
    public GroupChatClient() throws IOException {
        
        selector = Selector.open();
        //连接服务器
        socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将 channel 注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到 username
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username   " is ok...");
    }

    //向服务器发送消息
    public void sendInfo(String info) {
        info = username   " 说:"   info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //读取从服务器端回复的消息
    public void readInfo() {
        try {
            int readChannels = selector.select();
            if (readChannels > 0) {//有可以用的通道
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        //得到相关的通道
                        SocketChannel sc = (SocketChannel) key.channel();
                        //得到一个 Buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        sc.read(buffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                }
                iterator.remove(); //删除当前的 selectionKey,防止重复操作
            } else {
                //System.out.println("没有可以用的通道...");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {

        //启动我们客户端
        GroupChatClient chatClient = new GroupChatClient();
        //启动一个线程,每个 3 秒,读取从服务器发送数据
        new Thread() {
            public void run() {
                while (true) {
                    chatClient.readInfo();
                    try {
                        Thread.currentThread().sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        //发送数据给服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}

测试:

十一、自言自语

个人感受:越学习到后面,就越感觉基础的重要性。很多技术都只会去用,很多时候甚至都不懂它的设计理念也不懂为什么要那样做。

最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们一起学习,一起讨论吧。

在学习路上充满好奇心,明白思考的重要性,是支持我一直学习下去的积极推动力吧。希望你也能喜欢上编程!

0 人点赞