Netty Review - 从BIO到NIO的进化推演

2023-11-16 09:22:41 浏览数 (2)


BIO

要讲明白BIO和NIO,首先我们应该自己实现一个简易的服务器,单线程即可。

DEMO 1

代码语言:javascript复制
package com.artisan.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class Server {
    public static void main(String[] args) {

        // BIO 面向字节
        byte[] bytes = new byte[1024];

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(1234);
            System.out.println("服务端已开启端口");

            while (true) {
                System.out.println();

                System.out.println("服务端等待连接......");

                Socket socket = serverSocket.accept();
                System.out.println("服务端已经收到连接请求");

                System.out.println("服务端等待数据.....");

                socket.getInputStream().read(bytes);
                System.out.println("服务端等已收到数据.....");

                String msg = new String(bytes);

                System.out.println("接收到了数据: "   msg);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

创建了一个服务端类,在类中实现实例化了一个SocketServer并绑定了1234端口。之后调用accept方法来接收连接请求,并且调用read方法来接收客户端发送的数据。最后将接收到的数据打印。

代码语言:javascript复制
package com.artisan.bio;

import java.io.IOException;
import java.net.Socket; 

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class Client {

    public static void main(String[] args) throws IOException {

        Socket socket = new Socket("127.0.0.1",1234);

        socket.getOutputStream().write("数据数据数据".getBytes());

        socket.close();

    }
}

客户端,首先实例化Socket对象,并且绑定ip为127.0.0.1(本机),端口号为1234,调用write方法向服务器发送数据

运行测试会发现

在服务器启动后,客户端还没有连接服务器时,服务器由于调用了accept方法,将一直阻塞,直到有客户端请求连接服务器


DEMO 2

客户端的逻辑主要是:建立Socket –> 连接服务器 –> 发送数据,我们的数据是在连接服务器之后就立即发送的,现在我们来对客户端进行一次扩展,当我们连接服务器后,不立即发送数据,而是等待控制台手动输入数据后,再发送给服务端

客户端代码如下

代码语言:javascript复制
 try {
         Socket socket = new Socket("127.0.0.1",1234);
           String message = null;
           Scanner sc = new Scanner(System.in);
           message = sc.next();
           socket.getOutputStream().write(message.getBytes());
           socket.close();
           sc.close();
   } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
   }

小结论

从上面的运行结果中我们可以看到,服务器端在启动后:

1)首先需要等待客户端的连接请求(第一次阻塞)

2)如果没有客户端连接,服务端将一直阻塞等待;

3)然后当客户端连接后,服务器会等待客户端发送数据(第二次阻塞)

4)如果客户端没有发送数据,那么服务端将会一直阻塞等待客户端发送数据。


服务端从启动到收到客户端数据的这个过程,将会有两次阻塞的过程:

  • 1)第一次在等待连接时阻塞;
  • 2)第二次在等待数据时阻塞。

BIO会产生两次阻塞,这就是BIO的非常重要的一个特点


单线程BIO的缺陷

当我们的服务器接收到一个连接后,并且没有接收到客户端发送的数据时,是会阻塞在read()方法中的,那么此时如果再来一个客户端的请求,服务端是无法进行响应的。换言之:在不考虑多线程的情况下,BIO是无法处理多个客户端请求的


BIO如何处理并发

单线程版的BIO并不能处理多个客户端的请求,那么如何能使BIO处理多个客户端请求呢?

我们只需要在每一个连接请求到来时,创建一个线程去执行这个连接请求,就可以在BIO中处理多个客户端请求了,这也就是为什么BIO的其中一条概念是服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理。

【多线程BIO版本简易实现】

代码语言:javascript复制
 package com.artisan.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ServerMultThread {

    public static void main(String[] args) {
        byte[] buffer = new byte[1024];
        try {
            ServerSocket serverSocket = new ServerSocket(1234);
            System.out.println("服务器已启动并监听8080端口");
            while (true) {
                System.out.println();
                System.out.println("服务器正在等待连接...");
                Socket socket = serverSocket.accept();
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName()   " 服务器已接收到连接请求...");
                    System.out.println();
                    System.out.println(Thread.currentThread().getName()   "服务器正在等待数据...");
                    try {
                        socket.getInputStream().read(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()   "服务器已经接收到数据");
                    System.out.println();
                    String content = new String(buffer);
                    System.out.println(Thread.currentThread().getName()   "接收到的数据:"   content);
                }).start();

            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

多启动几个客户端,测试结果如下


多线程BIO服务器的弊端

多线程BIO服务器虽然解决了单线程BIO无法处理并发的弱点,但是也带来一个问题:如果有大量的请求连接到我们的服务器上,但是却不发送消息,那么我们的服务器也会为这些不发送消息的请求创建一个单独的线程,那么如果连接数少还好,连接数一多就会对服务端造成极大的压力

所以:如果这种不活跃的线程比较多,我们应该采取单线程的一个解决方案,但是单线程又无法处理并发,这就陷入了一种很矛盾的状态,于是就有了NIO


NIO

NIO要解决的问题

我们先来看看单线程模式下BIO服务器的代码,其实NIO需要解决的最根本的问题就是存在于BIO中的两个阻塞,分别是等待连接时的阻塞和等待数据时的阻塞

代码语言:javascript复制
package com.artisan.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class Server {
    public static void main(String[] args) {

        // BIO 面向字节
        byte[] bytes = new byte[1024];

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(1234);
            System.out.println("服务端已开启端口");

            while (true) {
                System.out.println();

                System.out.println("服务端等待连接......");

                Socket socket = serverSocket.accept();
                System.out.println("服务端已经收到连接请求");

                System.out.println("服务端等待数据.....");

                socket.getInputStream().read(bytes);
                System.out.println("服务端等已收到数据.....");

                String msg = new String(bytes);

                System.out.println("接收到了数据: "   msg);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

如果单线程服务器在等待数据时阻塞,那么第二个连接请求到来时,服务器是无法响应的。如果是多线程服务器,那么又会有为大量空闲请求产生新线程从而造成线程占用系统资源,线程浪费的情况。

那么我们的问题就转移到,如何让单线程服务器在等待客户端数据到来时,依旧可以接收新的客户端连接请求


模拟NIO

如果要解决上文中提到的单线程服务器接收数据时阻塞,而无法接收新请求的问题,那么其实可以让服务器在等待数据时不进入阻塞状态,问题不就迎刃而解了吗?

方案一: (等待连接时和等待数据时不阻塞)
代码语言:javascript复制
package com.artisan.bio;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class MyServer {

    public static void main(String[] args) throws Exception {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            //Java为非阻塞设置的类
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(1234));

            //设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            while (true) {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel == null) {
                    //表示没人连接
                    System.out.println(Thread.currentThread().getName()   " 正在等待客户端请求连接...");
                    Thread.sleep(5000);
                } else {
                    System.out.println(Thread.currentThread().getName()   "当前接收到客户端请求连接...");
                }
                if (socketChannel != null) {
                    //设置为非阻塞
                    socketChannel.configureBlocking(false);
                    byteBuffer.flip();//切换模式  写-->读
                    int effective = socketChannel.read(byteBuffer);
                    if (effective != 0) {
                        String content = Charset.forName("utf-8").decode(byteBuffer).toString();
                        System.out.println(content);
                    } else {
                        System.out.println(Thread.currentThread().getName()   "当前未收到客户端消息");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端测试代码

代码语言:javascript复制
package com.artisan.bio;

import java.io.IOException;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ClientWithInput {
    private static Socket socket;
    private static Scanner sc;

    public static void main(String[] args) throws IOException {

        try {
            while (true) {
                socket = new Socket("127.0.0.1", 1234);
                String message = null;
                sc = new Scanner(System.in);
                message = sc.next();
                socket.getOutputStream().write(message.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            socket.close();
            sc.close();
        }
    }
}

在这种解决方案下,虽然在接收客户端消息时不会阻塞,但是又开始重新接收服务器请求,用户根本来不及输入消息,服务器就转向接收别的客户端请求了,换言之,服务器弄丢了当前客户端的请求


方案二(缓存Socket,轮询数据是否准备好)
代码语言:javascript复制
package com.artisan.bio;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class MyServer {

    public static void main(String[] args) throws Exception {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        List<SocketChannel> socketList = new ArrayList<SocketChannel>();
        try {
            //Java为非阻塞设置的类
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(1234));
            //设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            while (true) {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel == null) {
                    //表示没人连接
                    System.out.println(Thread.currentThread().getName()   "正在等待客户端请求连接...");
                    Thread.sleep(5000);
                } else {
                    System.out.println(socketChannel.getRemoteAddress()   "当前接收到客户端请求连接...");
                    socketList.add(socketChannel);
                }
                for (SocketChannel socket : socketList) {
                    socket.configureBlocking(false);
                    int effective = socket.read(byteBuffer);
                    if (effective != 0) {
                        byteBuffer.flip();//切换模式  写-->读
                        String content = Charset.forName("UTF-8").decode(byteBuffer).toString();
                        System.out.println(socket.getRemoteAddress()   "接收到消息:"   content);
                        System.out.println();
                        byteBuffer.clear();
                    } else {
                        // System.out.println(socket.getRemoteAddress()   "当前未收到客户端消息");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端我们使用如下代码去模拟

代码语言:javascript复制
package com.artisan.bio;

import java.io.IOException;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ClientWithInput {
    private static Socket socket;
    private static Scanner sc;

    public static void main(String[] args) throws IOException {

        try {
            while (true) {
                socket = new Socket("127.0.0.1", 1234);
                String message = null;
                sc = new Scanner(System.in);
                message = sc.next();
                socket.getOutputStream().write(message.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            socket.close();
            sc.close();
        }
    }
}

我们可以发现

  1. 消息并没有丢失
  2. server端并没有开启多线程来处理消息,均是在main线程

在解决方案一中,我们采用了非阻塞方式,但是发现一旦非阻塞,等待客户端发送消息时就不会再阻塞了,而是直接重新去获取新客户端的连接请求,这就会造成客户端连接丢失。

而在解决方案二中,我们将连接存储在一个list集合中,每次等待客户端消息时都去轮询,看看消息是否准备好,如果准备好则直接打印消息。

可以看到,从头到尾我们一直没有开启第二个线程,而是一直采用单线程来处理多个客户端的连接,这样的一个模式可以很完美地解决BIO在单线程模式下无法处理多客户端请求的问题,并且解决了非阻塞状态下连接丢失的问题。


方案二存在的问题

从刚才的运行结果中其实可以看出,消息没有丢失,程序也没有阻塞。

但是,在接收消息的方式上可能有些许不妥,我们采用了一个轮询的方式来接收消息,每次都轮询所有的连接,看消息是否准备好,测试用例中只是三个连接,所以看不出什么问题来,但是我们假设有1000万连接,甚至更多,采用这种轮询的方式效率是极低的

另外,1000万连接中,我们可能只会有100万会有消息,剩下的900万并不会发送任何消息,那么这些连接程序依旧要每次都去轮询,这显然是不合适的


NIO是如何解决这些问题的

在真实NIO中,并不会在Java层上来进行一个轮询,而是将轮询的这个步骤交给我们的操作系统来进行,他将轮询的那部分代码改为操作系统级别的系统调用(select函数,在linux环境中为epoll),在操作系统级别上调用select函数,主动地去感知有数据的socket


使用select/poll/epoll和直接在应用层做轮询的区别

NIO使用了操作系统底层的轮询系统调用 select/epoll(windows:select,linux:epoll),那么为什么不直接实现而要去调用系统来做轮询呢?


select底层逻辑

假设有A、B、C、D、E五个连接同时连接服务器,那么根据我们上文中的设计,程序将会遍历这五个连接,轮询每个连接,获取各自数据准备情况,那么和我们自己写的程序有什么区别呢?

首先:我们写的Java程序其本质在轮询每个Socket的时候也需要去调用系统函数,那么轮询一次调用一次,会造成不必要的上下文切换开销

而:Select会将五个请求从用户态空间全量复制一份到内核态空间,在内核态空间来判断每个请求是否准备好数据,完全避免频繁的上下文切换。所以效率是比我们直接在应用层写轮询要高的。

如果:select没有查询到到有数据的请求,那么将会一直阻塞(是的,select是一个阻塞函数)。如果有一个或者多个请求已经准备好数据了,那么select将会先将有数据的文件描述符置位,然后select返回。返回后通过遍历查看哪个请求有数据。


select的缺点

  • 1)底层存储依赖bitmap,处理的请求是有上限的,为1024;
  • 2)文件描述符是会置位的,所以如果当被置位的文件描述符需要重新使用时,是需要重新赋空值的;
  • 3)fd(文件描述符)从用户态拷贝到内核态仍然有一笔开销;
  • 4)select返回后还要再次遍历,来获知是哪一个请求有数据。

poll的底层逻辑

poll的工作原理和select很像,先来看一段poll内部使用的一个结构体

代码语言:javascript复制
struct pollfd{
    int fd;
    short events;
    short revents;
}
  • poll同样会将所有的请求拷贝到内核态,和select一样,poll同样是一个阻塞函数,当一个或多个请求有数据的时候,也同样会进行置位,但是它置位的是结构体pollfd中的events或者revents置位,而不是对fd本身进行置位,所以在下一次使用的时候不需要再进行重新赋空值的操作。
  • poll内部存储不依赖bitmap,而是使用pollfd数组的这样一个数据结构,数组的大小肯定是大于1024的。

解决了select 1、2两点的缺点。


epoll的底层逻辑

epoll是最新的一种多路IO复用的函数。这里只说说它的特点。

  • epoll和上述两个函数最大的不同是,它的fd是共享在用户态和内核态之间的,所以可以不必进行从用户态到内核态的一个拷贝,这样可以节约系统资源。
  • 另外,在select和poll中,如果某个请求的数据已经准备好,它们会将所有的请求都返回,供程序去遍历查看哪个请求存在数据,但是epoll只会返回存在数据的请求,这是因为epoll在发现某个请求存在数据时,首先会进行一个重排操作,将所有有数据的fd放到最前面的位置,然后返回(返回值为存在数据请求的个数N),那么我们的上层程序就可以不必将所有请求都轮询,而是直接遍历epoll返回的前N个请求,这些请求都是有数据的请求。

高性能网络编程 - select、 poll 、epoll 、libevent


BIO VS NIO

Stream vs. Buffer

Java NIO和IO之间的第一个重要区别是IO是面向流的,NIO是面向缓冲区的。那么,这意味着什么?

  • 面向流的Java IO意味着你可以从流中一次读取一个或多个字节。你对读取的字节做什么取决于你。它们不会缓存在任何地方。此外,你无法在流中的数据中前后移动。如果需要在从流中读取的数据中前后移动,则需要先将其缓存在缓冲区中。
  • Java NIO的面向缓冲区的方法略有不同。数据被读入缓冲区,稍后处理该缓冲区。你可以根据需要在缓冲区中前后移动。这使你在处理过程中具有更大的灵活性。但是,你还需要检查缓冲区是否包含完整处理所需的所有数据。并且,你需要确保在将更多数据读入缓冲区时,不要覆盖尚未处理的缓冲区中的数据。

Blocking vs. Non-blocking

  • Java IO的各种流都是blocking的。这意味着,当线程调用read()或write()时,该线程将被阻塞,直到有一些数据要读取,或者数据被完全写入,在此期间,该线程无法执行任何其他操作。
  • Java NIO非阻塞模式允许线程请求从通道读取数据,并且只获取当前可用的内容,或者根本没有数据,如果当前没有数据可用。线程可以继续使用其他内容,而不是在数据可供读取之前保持阻塞状态。
  • 非阻塞写入也是如此,线程可以请求将某些数据写入通道,但不要等待它完全写入。然后线程可以继续并在同一时间做其他事情
  • 线程在IO调用中没有阻塞时花费空闲时间,通常在此期间在其他通道上执行IO。也就是说,单个线程现在可以管理多个输入和输出通道

Selectors

Java NIO的选择器允许单个线程监视多个输入通道。你可以使用选择器注册多个通道,然后使用单个线程“选择”具有可用于处理的输入的通道,或者选择准备写入的通道。这种选择器机制使单个线程可以轻松管理多个通道。


NIO和经典IO如何影响应用程序的设计

选择NIO或IO作为IO工具包可能会影响应用程序设计的以下方面:

1)API调用NIO或IO类;

2)处理数据;

3)用于处理数据的线程数。


API调用NIO或IO类

使用NIO时的API调用看起来与使用IO时不同。而不是仅仅从例如InputStream读取字节的数据字节,必须首先将数据读入缓冲区,然后从那里进行处理


数据处理
Java IO:从阻塞流中读取数据

在IO设计中,从InputStream或Reader中读取字节的数据字节

想象一下,正在处理基于行的文本数据流。

例如:

代码语言:javascript复制
Name: Anna
Age: 25
Email: [url=mailto:anna@mailserver.com]anna@mailserver.com[/url]
Phone: 1234567890

这个文本行流可以像这样处理:

代码语言:javascript复制
InputStream input = ... ; // get the InputStream from the client socket
 
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
 
String nameLine   = reader.readLine();
String ageLine    = reader.readLine();
String emailLine  = reader.readLine();
String phoneLine  = reader.readLine();

注意处理状态是如何,由程序执行的程度决定的。换句话说,一旦第一个reader.readLine()方法返回,我们就确定已经读取了整行文本。readLine()会阻塞直到读取整行,这就是原因。我们还知道此行包含名称。同样,当第二个readLine()调用返回时,我们知道此行包含年龄等。

正如我们所看到的,只有当有新数据要读取时,程序才会进行,并且对于每个步骤,我们都知道该数据是什么。一旦执行的线程已经超过读取代码中的某个数据片段,该线程就不会在数据中向后移动(通常不会)


Java NIO:从通道读取数据,直到所有需要的数据都在缓冲区中

NIO的实现看起来会有所不同,这是一个简化的例子:

代码语言:javascript复制
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);

注意第二行从通道读取字节到ByteBuffer。当该方法调用返回时,您不知道所需的所有数据是否都在缓冲区内。你只知道缓冲区包含一些字节,这使得处理更加困难。

想象一下,在第一次读取(缓冲)调用之后,是否所有读入缓冲区的内容都是半行。例如,“姓名:An”。你能处理这些数据吗?并不是的。在完成任何数据的处理之前,您需要等待至少一整行数据进入缓冲区。

那么你怎么知道缓冲区是否包含足够的数据来处理它?好吧,你没有。找出的唯一方法是查看缓冲区中的数据。结果是,在您知道所有数据是否存在之前,您可能需要多次检查缓冲区中的数据。这既低效又可能在程序设计方面变得混乱。

比如

代码语言:javascript复制
ByteBuffer buffer = ByteBuffer.allocate(48);
 
int bytesRead = inChannel.read(buffer);
 
while(! bufferFull(bytesRead) ) {
    bytesRead = inChannel.read(buffer);
}

bufferFull()方法必须跟踪读入缓冲区的数据量,并返回true或false,具体取决于缓冲区是否已满。换句话说,如果缓冲区已准备好进行处理,则认为它已满。

bufferFull()方法扫描缓冲区,但必须使缓冲区保持与调用bufferFull()方法之前相同的状态。如果不是,则可能无法在正确的位置读入读入缓冲区的下一个数据。这不是不可能的,但这是另一个需要注意的问题。

如果缓冲区已满,则可以对其进行处理。如果它不满,您可能能够部分处理那里的任何数据,如果这在您的特定情况下是有意义的。在许多情况下,它没有。

这个图中说明了is-data-in-buffer-ready循环:


适用场景

NIO允许您仅使用一个(或几个)线程来管理多个通道(网络连接或文件),但成本是解析数据可能比从阻塞流中读取数据时更复杂。

如果您需要同时管理数千个打开的连接,每个只发送一些数据,例如聊天服务器,在NIO中实现服务器可能是一个优势。同样,如果您需要与其他计算机保持大量开放连接,例如在P2P网络中,使用单个线程来管理所有出站连接可能是一个优势。


如果您拥有较少带宽的连接,一次发送大量数据,那么可能最经典的IO服务器实现可能是最合适的。


通俗解释

以众所周之的数据读取过程为例,我们来一个更简化的理解。

对于数据读取,就读取速度来说:CPU > 内存 > 硬盘。

  • I 就是从硬盘到内存
  • O- 就是从内存到硬盘

第一种方式:从硬盘读取数据,然后程序一直等,数据读完后,继续你的操作。这种方式是最简单的,叫阻塞IO(也就是经典IO)。

第二种方式:从硬盘读取数据,然后程序继续向下执行,等数据读取完后,通知当前程序读取完成(对硬件来说叫中断,对程序来说叫回调),然后此程序可以立即处理读取的数据,也可以执行完当前操作后再对读取完的数据进行操作。

0 人点赞