Java IO,NIO以及Netty网络编程

2021-12-11 17:05:10 浏览数 (2)

1. IO,NIO和Netty简介

1.1 阻塞 IO(Blocking I/O)

同步阻塞I/O模式:当一条线程执行 read() 或者 write() 方法时,这条线程会一直阻塞直到读取一些数据或者写出去的数据已经全部写出,在这期间这条线程不能做任何其他的事情。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。

但是,当面对十万甚至百万级连接的时候,传统的 BIO模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

1.2 非阻塞 NIO(New I/O)

NIO是一种同步非阻塞的 I/O模型,NIO 与原有的 IO 有同样的作用和目的,但是使用的方式完全不同,NIO 支持面向缓冲区、基于通道的操作。NIO 将以更加高效的方式进行文件读写操作。 JAVA NIO的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO系统,需要获取用于连接 IO设备的通道以及用于容纳数据的缓冲区数据进行处理。阻塞IO 会一直等待,所以非阻塞IO 是用来解决 IO线程与 Socket 之间的解耦问题,通过引入机制如果 Socket 发送缓冲区可写的话会通知 IO线程进行 write,如果 Socket 的接收缓冲区可读的话会通知 IO线程进行 read。NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel。

对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

1.3 Netty

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和精简了 TCP 和 UDP 套接字服务器等网络编程。

“快速而简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从许多协议(如 FTP、SMTP、HTTP 以及各种基于二进制和文本的遗留协议)的实现中获得的经验而精心设计的。结果,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

2. 编码

2.1 IO服务端
代码语言:javascript复制
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
​
public class OioServer {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
​
    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    //创建socket服务,监听10101端口
    ServerSocket server=new ServerSocket(10101);
    System.out.println("服务器启动!");
    while(true){
        //获取一个套接字(阻塞)
        final Socket socket = server.accept();
        System.out.println("来个一个新客户端!");
        newCachedThreadPool.execute(new Runnable() {
​
            @Override
            public void run() {
                //业务处理
                handler(socket);
            }
        });
​
    }
}
​
/**
 * 读取数据
 * @param socket
 * @throws Exception
 */
public static void handler(Socket socket){
    try {
        byte[] bytes = new byte[1024];
        InputStream inputStream = socket.getInputStream();
​
        while(true){
            //读取数据(阻塞)
            int read = inputStream.read(bytes);
            if(read != -1){
                System.out.println(new String(bytes, 0, read));
            }else{
                break;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        try {
            System.out.println("socket关闭");
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}}
2.2 NIO服务端
代码语言:javascript复制
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
​
public class NIOServer {
// 通道管理器
private Selector selector;
/**
 * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
 *
 * @param port
 *            绑定的端口号
 * @throws IOException
 */
public void initServer(int port) throws IOException {
    // 获得一个ServerSocket通道
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    // 设置通道为非阻塞
    serverChannel.configureBlocking(false);
    // 将该通道对应的ServerSocket绑定到port端口
    serverChannel.socket().bind(new InetSocketAddress(port));
    // 获得一个通道管理器
    this.selector = Selector.open();
    // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
    // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
​
/**
 * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
 *
 * @throws IOException
 */
public void listen() throws IOException {
    System.out.println("服务端启动成功!");
    // 轮询访问selector
    while (true) {
        // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
        selector.select();
        // 获得selector中选中的项的迭代器,选中的项为注册的事件
        Iterator<?> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey) ite.next();
            // 删除已选的key,以防重复处理
            ite.remove();
​
            handler(key);
        }
    }
}
​
/**
 * 处理请求
 *
 * @param key
 * @throws IOException
 */
public void handler(SelectionKey key) throws IOException {
​
    // 客户端请求连接事件
    if (key.isAcceptable()) {
        handlerAccept(key);
        // 获得了可读的事件
    } else if (key.isReadable()) {
        handelerRead(key);
    }
}
​
/**
 * 处理连接请求
 *
 * @param key
 * @throws IOException
 */
public void handlerAccept(SelectionKey key) throws IOException {
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    // 获得和客户端连接的通道
    SocketChannel channel = server.accept();
    // 设置成非阻塞
    channel.configureBlocking(false);
​
    // 在这里可以给客户端发送信息哦
    System.out.println("新的客户端连接");
    // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
    channel.register(this.selector, SelectionKey.OP_READ);
}
​
/**
 * 处理读的事件
 *
 * @param key
 * @throws IOException
 */
public void handelerRead(SelectionKey key) throws IOException {
    // 服务器可读取消息:得到事件发生的Socket通道
    SocketChannel channel = (SocketChannel) key.channel();
    // 创建读取的缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = channel.read(buffer);
    if(read > 0){
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("服务端收到信息:"   msg);
​
        //回写数据
        ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
        channel.write(outBuffer);// 将消息回送给客户端
    }else{
        System.out.println("客户端关闭");
        key.cancel();
    }
}
​
/**
 * 启动服务端测试
 *
 * @throws IOException
 */
public static void main(String[] args) throws IOException {
    NIOServer server = new NIOServer();
    server.initServer(8000);
    server.listen();
}
}
2.3 Netty回声服务器
代码语言:javascript复制
package Netty;
​
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
​
​
public final class EchoServer {
​
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
​
public static void main(String[] args) throws Exception {
  // Configure SSL.
  final SslContext sslCtx;
  if (SSL) {
          SelfSignedCertificate ssc = new SelfSignedCertificate();
          sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
      } else {
          sslCtx = null;
      }
​
  // 配置服务器
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  final EchoServerHandler serverHandler = new EchoServerHandler();
  try {
          ServerBootstrap b = new ServerBootstrap();
          b.group(bossGroup, workerGroup)
           .channel(NioServerSocketChannel.class)
           .option(ChannelOption.SO_BACKLOG, 100)
           .handler(new LoggingHandler(LogLevel.INFO))
           .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                    p.addLast(sslCtx.newHandler(ch.alloc()));
                                }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
       });
​
          // 启动服务器
          ChannelFuture f = b.bind(PORT).sync();
​
          // 阻塞直到断开连接
          f.channel().closeFuture().sync();
      } finally {
          // 关闭所有事件循环并关闭线程
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
}
}
​

服务端处理事件:

代码语言:javascript复制
package Netty;
​
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
​
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
​
​
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Scanner s = new Scanner(System.in);
        while(true) {
            String words = s.nextLine();
            final ByteBuf wordbuf = Unpooled.wrappedBuffer(words.getBytes(StandardCharsets.UTF_8));
           
            final ChannelFuture f = ctx.writeAndFlush(wordbuf); // (3)
​
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    if(words.equals("exit")){
                        ctx.close();
                        return;
                    }
​
                }
            }); // (4)
        }
    }
​
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

服务端:

代码语言:javascript复制
package Netty;
​
   /*
   * Copyright 2012 The Netty Project
   *
   * The Netty Project licenses this file to you under the Apache License,
   * version 2.0 (the "License"); you may not use this file except in compliance
   * with the License. You may obtain a copy of the License at:
   *
   *   https://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   * License for the specific language governing permissions and limitations
   * under the License.
   */
​
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
​
​
public final class EchoClient {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
​
    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
​
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
               @Override
        public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                            }
                        //p.addLast(new LoggingHandler(LogLevel.INFO));
                      p.addLast(new EchoClientHandler());
                  }
  });
​
                 // Start the client.
                 ChannelFuture f = b.connect(HOST, PORT).sync();
​
                 // 等待直到服务端关闭连接
                 f.channel().closeFuture().sync();
             } finally {
                 // Shut down the event loop to terminate all threads.
                 group.shutdownGracefully();
             }
     }
}

客户端处理事件:

代码语言:javascript复制
package Netty;
​
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
​
import java.util.Date;
​
/**
* Handler implementation for the echo client.  It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
      public class EchoClientHandler extends ChannelInboundHandlerAdapter {
​
          private final ByteBuf firstMessage;
​
          /**
    * Creates a client-side handler.
    */
          public EchoClientHandler() {
              firstMessage = Unpooled.buffer(EchoClient.SIZE);
              for (int i = 0; i < firstMessage.capacity(); i   ) {
                firstMessage.writeByte((byte) i);
            }
    }
​
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        String word=m.toString(CharsetUtil.UTF_8);
        try {
            System.out.println(word);
            if(word.equals("exit")){
                ctx.close();
            }
​
        } finally {
            m.release();
        }
    }
​
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

该程序允许从服务端向客户端发送消息:

发送方:

接收方:

3. 参考

[1] Netty3学习笔记(一) --- 传统IO与NIO比较

[2] Netty3学习笔记(二) --- Netty Helloworld入门

[3] Netty回声服务器

[4] Java面试——Netty

0 人点赞