Netty[Netty从入门到精通]

2022-09-02 17:40:51 浏览数 (1)

网络编程基础

OSI七层模型

应用层:Http协议、电子文件传输、文件服务器等 表示层:解决我们不同系统之间语法的通讯 会话层:建立与应用程序之间的通讯 传输层:提供了端口号和接口协议TPC/UDP

网络层:为数据包选择路由 路由器、交换机 定义了ip地址,可以根据ip地址找到对应的服务器

数据链路层:传输有地址的帧以及错误检测功能 物理层:以二进制形式,在物理机器上实现传输 (光纤、各种物理介质传输)

一个域名底层是如何解析

单原理实现:浏览器访问域名,根据域名先从本地host文件 C:WindowsSystem32driversetchosts文件 查找匹配对应的ip与域名,如果本地 Host文件 没有的情况下,则联网去电信运营商查找。

Socket网络通讯技术

TCP与UDP协议

Socket

Socket(套接字)是两个程序之间通过双向信道进行数据交换的端,可以理解为接口。使用Socket编程也称为网络编程,Socket只是接口并不是网络通信协议。

TCP与UDP区别

TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议 TCP协议应用场景:HTTP、HTTPS、FTP协议 UDP是面向无连接通讯协议,udp通讯时不需要接受方确定,属于不可靠传输,可能会存在丢包的现象。 UDP协议应用场景:QQ语音、QQ视频

三次握手和四次挥手(分手)概念

首先我们要知道在tcp建立连接中,有一些名词表示: 比如:syn就是建立连接、ack就是确认标志、fin终止标志

第一次握手:客户端会向服务器端发送码为syn=1,随机产生一个seq_number=x的数据包到服务器端 (syn) 第二次握手:服务端接受到客户端请求之后,确认ack=x 1, 于是就向客户端发送syn(服务端独立生成 随机生成数字Y) ack 第三次握手:客户端接受syn(随机数Y) ack,向服务器端发送ack=y 1,此包发送完毕即可 建立tcp连接。

白话文翻译: 第一次握手:客户端向服务器端发送 问服务器你在不在? 第二次握手:服务器端回应客户端说:我在的。 第三次握手:客户端发送给服务器端:ok,那我开始建立连接的

关闭连接: 第一次挥手: 客户端向服务器端发送释放的报文,停止发送数据 fin=1、生成一个序列号seq=u; 第二次挥手: 服务器端接受到释放的报文后,发送ack=u 1;随机生成的seq=v给客户端;当前状态为关闭等待状态

客户端收到了服务器确认通知之后,此时客户端就会进入到终止状态,等待服务器端发送释放报文。 第三次挥手:服务器端最后数据发送完毕之后,就向客户端发送连接释放报文,FIN=1,ack=u 1 当前为半关闭状态,随机生成一个随机树w

第四次挥手,客户端必须发出确认,ACK=1,ack=w 1,而自己的序列号是seq=u 1,此时,客户端就进入了TIME-WAIT(时间等待)状态。注意此时TCP连接还没有释放,必须经过2∗∗MSL(最长报文段寿命)的时间后,当客户端撤销相应的TCB后,才进入CLOSED状态。

服务器只要收到了客户端发出的确认,立即进入CLOSED状态。同样,撤销TCB后,就结束了这次的TCP连接。可以看到,服务器结束TCP连接的时间要比客户端早一些。

白话文翻译四次挥手:

第一次挥手 客户端向服务端发送一个释放连接通知; 第二次挥手 服务端接受到释放通知之后,告诉给客户端说等待一下,因为可能存在有其他的数据没有发送完毕,等待数据全部传输完毕之后就开始 关闭连接; 第三次挥手 服务器端所有的数据发送完毕之后,就告诉客户端说现在可以释放连接了。 第四次挥手: 客户端确认是最终释放连接通知,ok 就开始 就向服务区端发送我们可以开始关闭连接啦;

Socket Tcp通讯代码
服务端
代码语言:javascript复制
    public static void main(String[] args) throws IOException {
        // 创建Server Socket
        ServerSocket serverSocket = new ServerSocket();
        // 创建我们的 Socket 监听连接地址和端口号
        SocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
        // 绑定我们的监听地址
        serverSocket.bind(address);
        // 等待接受请求
        System.out.println("等待客户端发送消息..");
        Socket accept = serverSocket.accept();
        // 获取OutputStream流
        PrintWriter socketOut = new PrintWriter(accept.getOutputStream());
        byte buf[] = new byte[1024];
        if (accept.getInputStream().read(buf) > 0) {
            System.out.println("服务器端接受到客户端消息:"   new String(buf));
        }
        // 服务器端响应消息
        String sendStr = "我是高腾飞";
        socketOut.write(sendStr);
        socketOut.flush();

        // 关闭所有连接
        socketOut.close();
        accept.close();
        serverSocket.close();

    }
客户端
代码语言:javascript复制
    public static void main(String[] args) throws IOException {
        final Socket socket = new Socket();
        // 创建socket地址
        SocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
        socket.connect(address);
        // 创建PrintWriter
        PrintWriter socketOut = new PrintWriter(socket.getOutputStream());
        BufferedReader socketIn = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));

        // 向服务器发送的内容
        String sendStr = "客户端问服务器端: 你是高腾飞么?";
        socketOut.write(sendStr);
        socketOut.flush();
        String receiveStr = socketIn.readLine();
        System.out.println("服务器端回复:: "   receiveStr);

        // 关闭连接
        socketOut.close();
        socketIn.close();
        socket.close();

    }
Socket Udp通讯代码
服务端
代码语言:javascript复制
    public static void main(String[] args) throws IOException {
        DatagramSocket socket = new DatagramSocket(8800);
        //2.创建数据报,用于接收客户端发送的数据
        byte[] data = new byte[1024];
        //创建字节数组,指定接收的数据包的大小
        DatagramPacket packet = new DatagramPacket(data, data.length);
//3.接收客户端发送的数据
        System.out.println("****服务器端已经启动,等待客户端发送数据");
//此方法在接收到数据报之前会一直阻塞
        socket.receive(packet);
        //4.读取数据
        String info = new String(data, 0, packet.getLength());
        System.out.println("我是服务器,客户端说:"   info);

        /*
         * 向客户端响应数据
         */
//1.定义客户端的地址、端口号、数据
        InetAddress address = packet.getAddress();
        int port = packet.getPort();
        byte[] data2 = "我是高腾飞~~".getBytes();
        //2.创建数据报,包含响应的数据信息
        DatagramPacket packet2 = new DatagramPacket(data2, data2.length, address, port);
//3.响应客户端
        socket.send(packet2);
//4.关闭资源
        socket.close();
    }
客户端
代码语言:javascript复制
    public static void main(String[] args) throws IOException {
        InetAddress address = InetAddress.getByName("localhost");
        int port = 8800;
        byte[] data = "你是高腾飞?".getBytes();
//2.创建数据报,包含发送的数据信息
        DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
//3.创建DatagramSocket对象
        DatagramSocket socket = new DatagramSocket();
//4.向服务器端发送数据报
        socket.send(packet);

        /*
         * 接收服务器端响应的数据
         */
//1.创建数据报,用于接收服务器端响应的数据
        byte[] data2 = new byte[1024];
        DatagramPacket packet2 = new DatagramPacket(data2, data2.length);
//2.接收服务器响应的数据
        socket.receive(packet2);
//3.读取数据
        String reply = new String(data2, 0, packet2.getLength());
        System.out.println("我是客户端,服务器说:"   reply);
//4.关闭资源
        socket.close();

    }
Http协议7个请求过程

Http协议一种超文本传输的协议,基于TCP/IP协议的包装,包含:img、css、js、html等。

Http协议的特征:

  1. 无状态
  2. 请求与响应模型
  3. 简单快速
  4. 灵活可以传输任何类型

Http分为 请求与响应 请求: 请求行 请求头 请求方法 Get/Post 响应: 响应行 响应头 响应体

BIO,NIO,AIO 模型

输入IO与输出IO原理

内核态: CPU可以访问内存所有数据, 包括外围设备, 例如硬盘, 网卡;

用户态: (独立创建应用程序) 只能受限的访问内存, 且不允许访问外围设备. 占用CPU的能力被剥夺, CPU资源可以被其他程序获

1、BIO(Blocking I O) 同步阻塞模型,一个线程对应一个客户端连接。 应用场景: BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解。 2、NIO(Non Blockin g IO) 同步非阻塞, 服务器实现模式为一个线程可以处理多个请求(连接),客户端发送的连接请求都会注册到 多路复用器selector上,多路复用器轮询到连接有IO请求就进行处理。 应用场景: NIO方式适用于连接数目多且连接比较短(轻操作) 的架构, 比如聊天服务器, 弹幕系统, 服务器间通讯,编程比较复杂, JDK1.4 开始支持

3、AIO(NIO 2.0) 异步非阻塞, 由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用。是在NIO的基础上进一步封装的。 应用场景: AIO方式适用于连接数目多且连接比较长(重操作) 的架构,JDK7 开始支持

同步和异步的区别

同步也就是程序从上往下实现执行; 异步从新开启一个新分支,相互不会影响;

站在Http协议上分析同步与异步区别:

我们的Http协议请求默认情况下同步形式调用,如果调用过程非常耗时的情况下 客户端等待时间就非常长, 这种形式我们可以理解阻塞式;

解决办法:耗时的代码我们可以使用多线程或者MQ实现处理,但是不能立马获取结果; 客户端可以主动查询

阻塞与非阻塞的区别

阻塞:如果我没有获取到结果的情况下,当前线程从运行状态切换为阻塞状态 内核角度分析:用户空间切换到内核空间 非阻塞:如果我没有获取到结果的情况下,当前的线程不会阻塞。

BIO(Blocking IO) 同步阻塞模型

一个线程处理一个客户端请求; 缺点: 1、 IO代码里read操作是阻塞操作,如果获取不到数据的情况下,则会阻塞; 如果线程使用过多的情况下,非常消耗服务器端cpu的资源; 应用场景: BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高

NIO(Non Blocking IO) 同步非阻塞

NIO同步非阻塞的原理:多个客户端发送连接请求注册到(多路复用器)selector中, 多路复用器使用轮训机制实现检测每个io请求有数据就进行处理。 底层实现原理: I/O多路复用底层一般用的Linux API(select,poll,epoll)来实现

NIO 有三大核心组件: Channel(通道), Buffer(缓冲区),Selector(选择器)

1.Channel(通道) :称之为通道,和IO相连,通信双方进行数据交流的通道,需要和buffer结合使用。 2.Buffer(缓冲区) :对数据的读取/写入需要使用buffer,buffer本质就是一个数组。 3.Selector(选择器): IO多路复用 一个线程Thread使用选择器Selector通过轮询的方式去监听多个通道Channel上的事件,从而让一个线程可以处理多个事件。

Netty实战

初始Netty

什么是Netty

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。 异步非阻塞IO BIO 同步阻塞IO NIO 同步非阻塞IO linux操作系统内核 AIO异步非阻塞IO linux服务器内核支持不是很完善

为什么需要使用Netty

1.传统的NIO 的类库和 API 繁杂, 使用麻烦: 需要熟练掌握Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等。 2.开发工作量和难度都非常大: 例如客户端面临断连重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。 3.Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。 线程模型 注意:大多数企业都在使用Netty4,Netty5已经被废弃

为什么Netty使用NIO而不是AIO

原因:在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,因此在性能上没有明显的优势;Windows的AIO底层实现良好,但是Netty开发人员并没有把Windows作为主要使用平台考虑。

为什么要使用netty

  1. 异步非阻塞通讯 aio
  2. 高效的线程模型
  3. 无锁化的设计
  4. 高性能序列化框架
  5. 零拷贝、内存池
  6. 灵活的TCP协议参数设置

Netty使用场景

  1. RPC 框架Dubbo 动态代理设计模式
  2. Rocketmq netty 数据结构模式
  3. 聊天室
  4. 游戏
  5. Xxl-job分布式任务调度平台

TCP协议粘包与拆包

产生的背景

1.要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包; 2.接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包; 3.要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包; 4.待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。即TCP报文长度-TCP头部长度>MSS。

MSS (最大报文段长度): 最大报文段长度(MSS)是TCP协议的一个选项,用于在TCP连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度(不包括文段头)。

基于Netty手写mq组件

NettyMQServer

代码语言:javascript复制
public class NettyMQServer {
    public void bind(int port) throws Exception {
        /**
         * Netty 抽象出两组线程池BossGroup和WorkerGroup
         * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workerGroup)
                    // 设定NioServerSocketChannel 为服务器端
                    .channel(NioServerSocketChannel.class)
                    //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                    //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                    .option(ChannelOption.SO_BACKLOG, 100)
                    // 服务器端监听数据回调Handler
                    .childHandler(new ChildChannelHandler());
            //绑定端口, 同步等待成功;
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("当前服务器端启动成功...");
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //优雅关闭 线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 设置异步回调监听
            ch.pipeline().addLast(new MayiktServerHandler());
//////            1. 演示LineBasedFrameDecoder编码器
//            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//            ch.pipeline().addLast(new StringDecoder());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 9008;
        new NettyMQServer().bind(port);
    }


    private static final String type_consumer = "consumer";

    private static final String type_producer = "producer";
    /**
     * mq缓存消息
     */
    private static LinkedBlockingQueue<String> msgs = new LinkedBlockingQueue<String>();
    private static ArrayList<ChannelHandlerContext> consumerChannels = new ArrayList();

    // 生产者投递消息的:topicName
    public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {

        /**
         * 服务器接收客户端请求
         *
         * @param ctx
         * @param data
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object data)
                throws Exception {
            String body = byteBufToString(data);
            JSONObject jsonObject = JSONObject.parseObject(body);
            String type = jsonObject.getString("type");
            switch (type) {
                case type_consumer:
                    consumer(ctx);
                    return;
                case type_producer:
                    String msg = jsonObject.getString("msg");
                    producer(msg);
                    return;
            }

        }


        private void consumer(ChannelHandlerContext ctx) {
            // 保存连接
            consumerChannels.add(ctx);
            String poll = msgs.poll();
            if (!StringUtils.isEmpty(poll)) {
                // 从MQ服务器端获取消息给消费者
                ByteBuf resp = Unpooled.copiedBuffer(poll.getBytes());
                ctx.writeAndFlush(resp);
            }
            // 思路

        }

        /**
         * 缓存生产者投递消息
         *
         * @param msg
         */
        private void producer(String msg) {
            // 缓存消息
            msgs.offer(msg);
            consumerChannels.forEach((ctx) -> {
                String poll = msgs.poll();
                if (StringUtils.isEmpty(poll)) {
                    return;
                }
                // 发送数据给消费者
                ByteBuf resp = Unpooled.copiedBuffer(poll.getBytes());
                ctx.writeAndFlush(resp);
            });
            // 分组消息

        }

        private String byteBufToString(Object msg) throws UnsupportedEncodingException {
            if (msg == null) {
                return null;
            }
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            return body;
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            consumerChannels.remove(ctx);
            ctx.close();
        }
    }
}

NettyMQProducer

代码语言:javascript复制
public class NettyMQProducer {
    public void connect(int port, String host) throws Exception {
        //配置客户端NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(group)
                    // 设置为Netty客户端
                    .channel(NioSocketChannel.class)
                    /**
                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                     */
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
////                            1. 演示LineBasedFrameDecoder编码器
//                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//                            ch.pipeline().addLast(new StringDecoder());
                        }
                    });

            //绑定端口, 异步连接操作
            ChannelFuture future = client.connect(host, port).sync();
            //等待客户端连接端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭 线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 9008;
        NettyMQProducer client = new NettyMQProducer();
        try {
            client.connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();
            data.put("type", "producer");
            JSONObject msg = new JSONObject();
            msg.put("userId", "123456");
            msg.put("age", "23");
            data.put("msg", msg);
            // 生产发送数据
            byte[] req = data.toJSONString().getBytes();
            ByteBuf firstMSG = Unpooled.buffer(req.length);
            firstMSG.writeBytes(req);
            ctx.writeAndFlush(firstMSG);
        }

        /**
         * 客户端读取到服务器端数据
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("客户端接收到服务器端请求:"   body);
        }

        // tcp属于双向传输

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

NettyMQConsumer

代码语言:javascript复制
public class NettyMQConsumer {
    public void connect(int port, String host) throws Exception {
        //配置客户端NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(group)
                    // 设置为Netty客户端
                    .channel(NioSocketChannel.class)
                    /**
                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                     */
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyMQConsumerHandler());
                        }
                    });

            //绑定端口, 异步连接操作
            ChannelFuture future = client.connect(host, port).sync();
            //等待客户端连接端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭 线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 9008;
        NettyMQConsumer client = new NettyMQConsumer();
        try {
            client.connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public class NettyMQConsumerHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();
            data.put("type", "consumer");
            // 消费者发送数据
            byte[] req = data.toJSONString().getBytes();
            ByteBuf firstMSG = Unpooled.buffer(req.length);
            firstMSG.writeBytes(req);
            ctx.writeAndFlush(firstMSG);
        }

        /**
         * 客户端读取到服务器端数据
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("消费者读取数据:"   body);
        }

        // tcp属于双向传输

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

基于netty手写服务注册

依赖文件

代码语言:javascript复制
      <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.0.23.Final</version>
        </dependency>

        <!--jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.11.Final</version>
        </dependency>

封装实体类

代码语言:javascript复制
package com.gtf.NETTYserver.entity;

import java.io.Serializable;


public class AddresDto implements Serializable {
    private String addres;

    public AddresDto(String addres) {
        this.addres = addres;
    }

    public AddresDto() {

    }

    public String getAddres() {
        return addres;
    }

    public void setAddres(String addres) {
        this.addres = addres;
    }

    @Override
    public String toString() {
        return "AddresDto{"  
                "addres='"   addres   '''  
                '}';
    }
}
代码语言:javascript复制
package com.gtf.NETTYserver.entity;

import io.netty.channel.ChannelHandlerContext;

import java.io.Serializable;


public class AddresEntity implements Serializable {
    private String addres;
    // 生产的连接
    private ChannelHandlerContext ctx;

    public AddresEntity() {
    }

    public AddresEntity(String addres, ChannelHandlerContext ctx) {
        this.addres = addres;
        this.ctx = ctx;
    }

    public String getAddres() {
        return addres;
    }

    public void setAddres(String addres) {
        this.addres = addres;
    }

    public ChannelHandlerContext getCtx() {
        return ctx;
    }

    public void setCtx(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    @Override
    public String toString() {
        return "AddresEntity{"  
                "addres='"   addres   '''  
                '}';
    }
}
代码语言:javascript复制
package com.gtf.NETTYserver.entity;

import java.io.Serializable;


public class AgreementEntity implements Serializable {
    // 0为生产者  1为消费者
    private Integer type;


    // 服务id
    private String serviceId;

    // 服务地址
    private String addres;



    public AgreementEntity(Integer type, String serviceId, String addres) {
        this.type = type;
        this.serviceId = serviceId;
        this.addres = addres;
    }

    public Integer getType() {
        return type;
    }

    public String getServiceId() {
        return serviceId;
    }

    public String getAddres() {
        return addres;
    }

    public void setType(Integer type) {
        this.type = type;
    }

    public void setServiceId(String serviceId) {
        this.serviceId = serviceId;
    }

    public void setAddres(String addres) {
        this.addres = addres;
    }
}
代码语言:javascript复制
package com.gtf.NETTYserver.entity;

import java.io.Serializable;
import java.util.List;

public class RespEntity implements Serializable  {

    private Integer code;
    private String msg;
    private Object data;

    public RespEntity(Integer code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    public RespEntity(Integer code, String msg, Object data) {
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "RespEntity{"  
                "code="   code  
                ", msg='"   msg   '''  
                ", data="   data  
                '}';
    }
}

服务端

代码语言:javascript复制
package com.gtf.NETTYserver.server;

import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AddresDto;
import com.gtf.NETTYserver.entity.AddresEntity;
import com.gtf.NETTYserver.entity.AgreementEntity;
import com.gtf.NETTYserver.entity.RespEntity;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.http.ResponseEntity;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 注册中心
 */
public class RegistryServer {
    /**
     * 生产者
     */
    public static final int type_producer = 0;
    /**
     * 消费者
     */
    public static final int type_consumer = 1;

    private static ConcurrentHashMap<String, List<AddresEntity>> keyaddresses = new ConcurrentHashMap<String, List<AddresEntity>>();
    //key 为连接 value 服务名称
    private static ConcurrentHashMap<ChannelHandlerContext, String> ctxs = new ConcurrentHashMap<>();
    public void bind(int port) throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new  ServerBootstrap();
            b.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1280)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 新增Marshaling编码器
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingDecoder()
                            );
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingEncoder()
                            );
                            ch.pipeline().addLast(
                                    new ServerHandler()
                            );
                        }
                    });

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8006;
        new RegistryServer().bind(port);
    }


    @ChannelHandler.Sharable
    public class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            AgreementEntity agreementEntity= (AgreementEntity) msg;
           switch (agreementEntity.getType()){
               case type_producer:
                   //生产者
                   producer(agreementEntity,ctx);
                   break;
               case type_consumer:
                   consumer(agreementEntity,ctx);
                   break;
           }
            System.out.println("服务器端接收到响应内容"   msg);
        }

        /**
         * 生产者
         * @param agreementEntity
         * @param ctx
         */
        private void producer(AgreementEntity agreementEntity,ChannelHandlerContext ctx){
            String serviceId = agreementEntity.getServiceId();
            //根据服务名称查找之前是否有缓存地址
            List<AddresEntity> listAddress = keyaddresses.get(serviceId);
            if (listAddress == null) {
                listAddress=new ArrayList<>();
                keyaddresses.put(serviceId, listAddress);
                ctxs.put(ctx, agreementEntity.getServiceId());
            }
            String addres = agreementEntity.getAddres();
            AddresEntity addresEntity = new AddresEntity(addres, ctx);
            listAddress.add(addresEntity);
        }
        private void consumer(AgreementEntity agreementEntity,ChannelHandlerContext ctx){
            String serviceId = agreementEntity.getServiceId();
            List<AddresEntity> list = keyaddresses.get(serviceId);
            if (list == null || list.size() ==0 ) {
                ctx.writeAndFlush(new RespEntity(500, "没有接口列表"));
                return ;
            }
            ArrayList<AddresDto> objects = new ArrayList<AddresDto>();
            list.forEach((t)->{
                objects.add(new AddresDto(t.getAddres()));
            });
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            //如果客户端主动断开链接。回调这个方法
            String serviceId = ctxs.get(ctx);
            // 根据服务名称查找缓存地址
            List<AddresEntity> addresEntities = keyaddresses.get(serviceId);
            // 遍历缓存地址剔除
            addresEntities.forEach((t) -> {
                if (t.getCtx() == ctx) {
                    //剔除
                    addresEntities.remove(t);
                }
            });

            ctx.close();
        }
    }

}

生产者

代码语言:javascript复制
package com.gtf.NETTYserver.producer;

import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AgreementEntity;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class ProducerClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 设置 Marshalling 编码
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingDecoder()
                            );
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingEncoder()
                            );
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8006;
        String host = "127.0.0.1";
        new ProducerClient().connect(port, host);
    }
    // 客户端Handler

    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //发送数据给服务端
            ctx.writeAndFlush(new AgreementEntity(0,"gtf02-producer","127.0.0.1:8080"));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

消费者

代码语言:javascript复制
package com.gtf.NETTYserver.sumer;


import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AgreementEntity;
import com.gtf.NETTYserver.entity.RespEntity;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.List;


public class ConsumerClient {
    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 设置 Marshalling 编码
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingDecoder()
                            );
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingEncoder()
                            );
                            ch.pipeline().addLast(new ConsumerClientHandler());
                        }
                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8006;
        String host = "127.0.0.1";
        new ConsumerClient().connect(port, host);
    }

    // 客户端Handler
    public class ConsumerClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            AgreementEntity agreementEntity = new AgreementEntity(1,
                    "gtf01", null);
            ctx.writeAndFlush(agreementEntity);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        /**
         * 客户端读取到服务器端数据
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            RespEntity respEntity = (RespEntity) msg;
            System.out.println("消费者读取数据:"   respEntity.toString());
        }
    }
}

基于netty手写rpc框架

rpc框架例如dubbo,实现服务的发现注册以及调用。 忽略

0 人点赞