Netty Review - 探索Pipeline的Inbound和Outbound

2023-12-01 10:01:12 浏览数 (3)

概念

我们知道boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandlerOutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有readwrite事件,然后通过线程池去执行Pipeline的业务流

selector如何查询哪些socket连接有readwrite事件,主要取决于调用操作系统的哪种IO多路复用内核

  • 如果是select注意,此处的select是指操作系统内核的select IO多路复用,不是nettyseletor对象),那么将会遍历所有socket连接,依次询问是否有readwrite事件,最终操作系统内核将所有IO事件的socket连接返回给netty进程,当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝
  • 如果是epoll,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗

Netty中,InboundOutbound是两个重要的概念,用于描述数据在ChannelPipeline中的流动方向。

Inbound(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline中,Inbound数据会依次经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的末尾。

Outbound(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline中,Outbound数据会从Pipeline的末尾开始,逆序经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的起始位置。

InboundOutbound的区别在于数据的流动方向。Inbound数据是从网络进入应用程序,而Outbound数据是从应用程序发送到网络。这意味着Inbound数据是应用程序接收和处理外部数据的入口,而Outbound数据是应用程序发送数据到外部的出口。

虽然InboundOutbound描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline中,InboundOutbound数据可以相互影响和交互。例如,一个ChannelHandler可以在处理Inbound数据时生成Outbound数据作为响应,或者在处理Outbound数据时修改Inbound数据的内容。

总结起来,InboundOutbound是描述数据在ChannelPipeline中流动方向的概念。Inbound数据是从网络进入应用程序,Outbound数据是从应用程序发送到网络。它们在ChannelPipeline中相互影响和交互,共同实现网络数据的处理和传输。


Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是headtail,当selector轮询到socketread事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandlerChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler .

ChannelHandler分为InbounHandlerOutboundHandler

  • InboundHandler用来处理接收消息
  • OutboundHandler用来处理发送消息。

headChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socketreadwritetailChannelHandler只是InboundHandlerreadPipleline处理将会最终到达tail


演示之前,我们先附一下代码

Server Code

代码语言:javascript复制
package com.artisan.pipeline.inout;

import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanEchoServer {

    private int port;

    public ArtisanEchoServer(int port) {
        this.port = port;
    }

    private void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new EchoOutboundHandler3());
                            ch.pipeline().addLast(new EchoOutboundHandler2());
                            ch.pipeline().addLast(new EchoOutboundHandler1());


                            ch.pipeline().addLast(new EchoInboundHandler1());
                            ch.pipeline().addLast(new EchoInboundHandler2());
                            ch.pipeline().addLast(new EchoInboundHandler3());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 10000)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            System.out.println("EchoServer正在启动...");

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("EchoServer绑定端口:"   port);

            channelFuture.channel().closeFuture().sync();
            System.out.println("EchoServer已关闭.");


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        int port = 1234;
        if (args != null && args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        ArtisanEchoServer server = new ArtisanEchoServer(port);
        server.run();
    }
}

6个handler演示如下

代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler1.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler1.channelRead 收到数据:"   data);
        ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] "   data, CharsetUtil.UTF_8));

        System.out.println("退出 EchoInboundHandler1 channelRead");

        System.out.println();

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler1.channelReadComplete]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler1.exceptionCaught]"   cause.toString());
    }
}
代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler2.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler2.channelRead 接收到数据:"   data);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] "   data, CharsetUtil.UTF_8));
        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] "   data, CharsetUtil.UTF_8));

        System.out.println("退出 EchoInboundHandler2 channelRead");

        System.out.println();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler2.exceptionCaught]");
    }
}
代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler3.channelRead");

        String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler3.channelRead 接收到数据:"   data);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] "   data, CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);

        System.out.println("退出 EchoInboundHandler3 channelRead");

        System.out.println();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler3.exceptionCaught]");
    }

}
代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {


    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler1.write");

        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler1.write");
    }

}
代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler2.write");

        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));
        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler2.write");
    }
}
代码语言:javascript复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler3.write");

        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler3.write");
    }

}

Client Code

代码语言:javascript复制
package com.artisan.netty4.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 客户端启动程序
 * @mark: show me the code , change the world
 */
public class ArtisanClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                    //设置客户端的通道实现类型
                    .channel(NioSocketChannel.class)
                    //使用匿名内部类初始化通道
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new ArtisanClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}
代码语言:javascript复制
package com.artisan.netty4.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 通用handler,处理I/O事件
 * @mark: show me the code , change the world
 */
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));
        System.out.println("客户端发消息给服务端结束");

        System.out.println();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端"   ctx.channel().remoteAddress()   "的消息:"   byteBuf.toString(CharsetUtil.UTF_8));
    }

}

InboundHandler和OutboundHandler的执行顺序

在InboundHandler中不触发fire方法

ArtisanEchoServer#run 中我们先进存在InboundHandler

先启动server, 在启动Client,我们测试一下

我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行

InboundHandler是通过fire事件决定是否要执行下一个InboundHandler,如果InboundHandler没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。

我们来看下源码


InboundHandler和OutboundHandler的执行顺序

加入Pipeline的ChannelHandler的顺序如上。

别忘了放开EchoInboundHandler2

代码语言:javascript复制
 ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] "   data, CharsetUtil.UTF_8));

我们来验证下

执行顺序如上。

代码语言:javascript复制
InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

1、InboundHandler是按照Pipleline的加载顺序,顺序执行。

2、OutboundHandler是按照Pipeline的加载顺序,逆序执行。


如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。

代码语言:javascript复制
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("进入 EchoInboundHandler2.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler2.channelRead 接收到数据:"   data);
//        ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] "   data, CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] "   data, CharsetUtil.UTF_8));


        System.out.println("退出 EchoInboundHandler2 channelRead");

    }
.......
.......
.......

0 人点赞