概念
我们知道当boss
线程监控到绑定端口上有accept
事件,此时会为该socket
连接实例化Pipeline
,并将InboundHandler
和OutboundHandler
按序加载到Pipeline
中,然后将该socket
连接(也就是Channel
对象)挂载到selector
上。
一个selector
对应一个线程,该线程会轮询所有挂载在他身上的socket
连接有没有read
或write
事件,然后通过线程池去执行Pipeline
的业务流。
selector
如何查询哪些socket
连接有read
或write
事件,主要取决于调用操作系统的哪种IO多路复用内核,
- 如果是
select
(注意,此处的select
是指操作系统内核的select IO多路复用
,不是netty
的seletor对象
),那么将会遍历所有socket
连接,依次询问是否有read
或write
事件,最终操作系统内核将所有IO事件的socket
连接返回给netty
进程,当有很多socket
连接时,这种方式将会大大降低性能,因为存在大量socket
连接的遍历和内核内存的拷贝。 - 如果是
epoll
,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket
连接列表,selector
直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗
在Netty
中,Inbound
和Outbound
是两个重要的概念,用于描述数据在ChannelPipeline
中的流动方向。
Inbound
(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline
中,Inbound
数据会依次经过Pipeline
中的每个ChannelHandler
进行处理,直到到达Pipeline
的末尾。
Outbound
(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline
中,Outbound
数据会从Pipeline
的末尾开始,逆序经过Pipeline
中的每个ChannelHandler
进行处理,直到到达Pipeline
的起始位置。
Inbound
和Outbound
的区别在于数据的流动方向。Inbound
数据是从网络进入应用程序,而Outbound
数据是从应用程序发送到网络。这意味着Inbound
数据是应用程序接收和处理外部数据的入口,而Outbound
数据是应用程序发送数据到外部的出口。
虽然Inbound
和Outbound
描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline
中,Inbound
和Outbound
数据可以相互影响和交互。例如,一个ChannelHandler
可以在处理Inbound
数据时生成Outbound
数据作为响应,或者在处理Outbound
数据时修改Inbound
数据的内容。
总结起来,Inbound
和Outbound
是描述数据在ChannelPipeline
中流动方向的概念。Inbound
数据是从网络进入应用程序,Outbound
数据是从应用程序发送到网络。它们在ChannelPipeline
中相互影响和交互,共同实现网络数据的处理和传输。
Pipeline
的责任链是通过ChannelHandlerContext
对象串联的,ChannelHandlerContext
对象里封装了ChannelHandler
对象,通过prev和next节点实现双向链表。Pipeline
的首尾节点分别是head
和tail
,当selector
轮询到socket
有read
事件时,将会触发Pipeline
责任链,从head
开始调起第一个InboundHandler
的ChannelRead
事件,接着通过fire
方法依次触发Pipeline
上的下一个ChannelHandler
.
ChannelHandler
分为InbounHandler
和OutboundHandler
InboundHandler
用来处理接收消息OutboundHandler
用来处理发送消息。
head
的ChannelHandler
既是InboundHandler
又是OutboundHandler
,无论是read
还是write
都会经过head
,所以head
封装了unsafe
方法,用来操作socket
的read
和write
。tail
的ChannelHandler
只是InboundHandler
,read
的Pipleline
处理将会最终到达tail
演示之前,我们先附一下代码
Server Code
代码语言:javascript复制package com.artisan.pipeline.inout;
import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class ArtisanEchoServer {
private int port;
public ArtisanEchoServer(int port) {
this.port = port;
}
private void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoOutboundHandler3());
ch.pipeline().addLast(new EchoOutboundHandler2());
ch.pipeline().addLast(new EchoOutboundHandler1());
ch.pipeline().addLast(new EchoInboundHandler1());
ch.pipeline().addLast(new EchoInboundHandler2());
ch.pipeline().addLast(new EchoInboundHandler3());
}
})
.option(ChannelOption.SO_BACKLOG, 10000)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("EchoServer正在启动...");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("EchoServer绑定端口:" port);
channelFuture.channel().closeFuture().sync();
System.out.println("EchoServer已关闭.");
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 1234;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
ArtisanEchoServer server = new ArtisanEchoServer(port);
server.run();
}
}
6个handler演示如下
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println();
System.out.println("进入 EchoInboundHandler1.channelRead");
String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println("EchoInboundHandler1.channelRead 收到数据:" data);
ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " data, CharsetUtil.UTF_8));
System.out.println("退出 EchoInboundHandler1 channelRead");
System.out.println();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("[EchoInboundHandler1.channelReadComplete]");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("[EchoInboundHandler1.exceptionCaught]" cause.toString());
}
}
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println();
System.out.println("进入 EchoInboundHandler2.channelRead");
String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println("EchoInboundHandler2.channelRead 接收到数据:" data);
//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " data, CharsetUtil.UTF_8));
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " data, CharsetUtil.UTF_8));
System.out.println("退出 EchoInboundHandler2 channelRead");
System.out.println();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("[EchoInboundHandler2.exceptionCaught]");
}
}
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println();
System.out.println("进入 EchoInboundHandler3.channelRead");
String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
System.out.println("EchoInboundHandler3.channelRead 接收到数据:" data);
//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " data, CharsetUtil.UTF_8));
ctx.fireChannelRead(msg);
System.out.println("退出 EchoInboundHandler3 channelRead");
System.out.println();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("[EchoInboundHandler3.exceptionCaught]");
}
}
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("进入 EchoOutboundHandler1.write");
//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
// ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
ctx.write(msg);
System.out.println("退出 EchoOutboundHandler1.write");
}
}
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("进入 EchoOutboundHandler2.write");
//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));
ctx.write(msg);
System.out.println("退出 EchoOutboundHandler2.write");
}
}
代码语言:javascript复制package com.artisan.pipeline.inout.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("进入 EchoOutboundHandler3.write");
ctx.write(msg);
System.out.println("退出 EchoOutboundHandler3.write");
}
}
Client Code
代码语言:javascript复制package com.artisan.netty4.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author 小工匠
* @version 1.0
* @description: 客户端启动程序
* @mark: show me the code , change the world
*/
public class ArtisanClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端通道的处理器
ch.pipeline().addLast(new ArtisanClientHandler());
}
});
System.out.println("客户端准备就绪");
//连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
代码语言:javascript复制package com.artisan.netty4.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @description: 通用handler,处理I/O事件
* @mark: show me the code , change the world
*/
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));
System.out.println("客户端发消息给服务端结束");
System.out.println();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" ctx.channel().remoteAddress() "的消息:" byteBuf.toString(CharsetUtil.UTF_8));
}
}
InboundHandler和OutboundHandler的执行顺序
在InboundHandler中不触发fire方法
ArtisanEchoServer#run
中我们先进存在InboundHandler
先启动server, 在启动Client,我们测试一下
我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行
InboundHandler
是通过fire
事件决定是否要执行下一个InboundHandler,如果InboundHandler
没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。
我们来看下源码
InboundHandler和OutboundHandler的执行顺序
加入Pipeline的ChannelHandler的顺序如上。
别忘了放开EchoInboundHandler2
ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " data, CharsetUtil.UTF_8));
我们来验证下
执行顺序如上。
代码语言:javascript复制InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3
1、InboundHandler
是按照Pipleline的加载顺序,顺序执行。
2、OutboundHandler
是按照Pipeline的加载顺序,逆序执行。
如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗
其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。
代码语言:javascript复制public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("进入 EchoInboundHandler2.channelRead");
String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println("EchoInboundHandler2.channelRead 接收到数据:" data);
// ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " data, CharsetUtil.UTF_8));
// ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " data, CharsetUtil.UTF_8));
System.out.println("退出 EchoInboundHandler2 channelRead");
}
.......
.......
.......