一、概要
在上一篇文章讲到Dotnetty的基本认识,本文这次会讲解dotnetty非常核心的模块是属于比较硬核的干货了,然后继续往下讲解如何根据自己的需求或者自己的喜好去配置Dotnetty而不是生搬硬套官网的示例源码。如果看了本文有收获的话麻烦关注一下文章尾部的公众号和技术讨论群。各位的支持是对我莫大的帮助。
二、简介
主要讲解一下几个知识点:
- EventLoopGroup & EventLoop
- Bootstrap
- Channel
- ChannelPipeline & ChannelHandler
- ChannelHandlerContext
- ChannelHandler
三、详细内容
服务端启动引导类Bootstrap
1.EventLoopGroup & EventLoop
- 高性能RPC框架的3个要素:IO模型、数据协议、线程模型
- EventLoop好比一个线程,1个EventLoop可以服务多个Channel,1个Channel只有一个EventLoop可以创建多个 EventLoop 来优化资源利用,也就是EventLoopGroup。
- EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个EventLoop
- EventLoopGroup →多个 EventLoop ,EventLoop →维护一个 Selector。
2.服务器启动引导类:ServerBootstrap
Group :设置线程组模型,Reactor线程模型对比EventLoopGroup
- 单线程
- 多线程
- 主从线程
Channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel
Option: 作用于每个新建立的channel,设置TCP连接中的一些参数,如下:
- ChannelOption.SO_BACKLOG: 存放已完成三次握手的请求的等待队列的最大长度;
- ChannelOption.TCP_NODELAY: 为了解决Nagle的算法问题,默认是false, 要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数,就设置为false,会累积一定大小后再发送。
- ChildOption: 作用于被accept之后的连接
- ChildHandler: 用于对每个通道里面的数据处理
3.连接通道类:Channel
Channel: 客户端和服务端建立的一个连接通道(可以理解为一个channel就是一个socket连接) ChannelHandler:负责Channel的逻辑处理 ChannelPipeline: 负责管理ChannelHandler的有序容器
关系: 一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中 创建 Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久 性的Channel当状态出现变化,就会触发对应的事件。
生命周期:
- ChannelRegistered: channel注册到一个EventLoop
- ChannelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据
- ChannelInactive: channel处于非活跃状态,没有连接到远程主机
- ChannelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定
4.频道的内部实现 ChannelHandler & ChannelPipeline
- ChannelInboundHandler:(入站) 处理输入数据和Channel状态类型改变,适配器。
- ChannelInboundHandlerAdapter(适配器设计模式) 常用的:SimpleChannelInboundHandler
- ChannelOutboundHandler:(出站) 处理输出数据,适配器 ChannelOutboundHandlerAdapter
- ChannelPipeline:好比厂里的流水线一样,可以在上面添加多个ChannelHanler,也可看成是一串
- ChannelHandler 实例,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形 式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权。
5.频道的内部实现 ChannelHandler & ChannelPipeline
ChannelHandlerContext是连接ChannelHandler和ChannelPipeline的桥梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合。
- 好比调用write方法Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者都会在整个管道流里 传播,而ChannelHandlerContext就只会在后续的Handler里面传播。
- AbstractChannelHandlerContext类双向链表结构,next/prev分别是后继节点,和前驱节点。
- DefaultChannelHandlerContext 是实现类,但是大部分都是父类那边完成,这个只是简单的实现一些方法 主要就是判断Handler的类型。
- ChannelInboundHandler之间的传递,主要通过调用ctx里面的FireXXX()方法来实现下个handler的调用。
6.Handler执行顺序
一般的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序?
InboundHandler顺序执行,OutboundHandler逆序执行
- InboundHandler顺序执行,OutboundHandler逆序执行
- InboundHandler之间传递数据,通过context.fireChannelRead(message)
- InboundHandler通过context.write(message),则会传递到outboundHandler
- 使用context.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行;但是使用channel.write(msg)、pipline.write(msg)情况会不一致,都会执行。
- OutBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再 inbound,服务端则相反。
四、实战环节
以上概念性的东西介绍完了之后开始编写本章实战代码(完整的案例代码将在qq群文件共享里上传,文章末尾有QQ群二维码和联系方式)。接下来我们先看一下项目结构。
Handlers - 主要存放所有处理相关类。
Initializer - 存放初始化tcp服务的相关内容。
appsetting.json - 主要存放的内容为,服务端的相关配置例如:ip地址、端口号等。
dotnetty - 安全证书
Program - 启动类
项目结构介绍完毕之后,我大致将这个demo分为5个部分来实现具体根据自己需求去设计搭建结构都是可以的,这里的内容仅供参考。
- 第一步,配置构建引导类
1 //主要工作组,设置为2个线程
2 private static readonly IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(4);
3 //子工作组,默认为内核数*2的线程数
4 private static readonly IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();
5
6 static async Task RunAsync() {
7 /*
8 *初始化服务端引导对象。
9 *声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,
10 *通过链式的方式组装需要的参数
11 */
12 ServerBootstrap bootstrap = new ServerBootstrap();
13 //添加工作组
14 bootstrap.Group(bossGroup, workerGroup);
15 //初始化工作频道
16 bootstrap.Channel<TcpServerSocketChannel>();
17 bootstrap
18 //存放已完成三次握手的请求的等待队列的最大长度;
19 .Option(ChannelOption.SoBacklog, 1024)
20 //ByteBuf的分配器(重用缓冲区)大小
21 .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
22 //接收字符的长度
23 .Option(ChannelOption.RcvbufAllocator, new FixedRecvByteBufAllocator(1024 * 8))
24 //保持长连接
25 .ChildOption(ChannelOption.SoKeepalive, true)
26 //取消延迟发送
27 .ChildOption(ChannelOption.TcpNodelay, true)
28 //端口复用
29 .ChildOption(ChannelOption.SoReuseport, true)
30 //初始化日志拦截器,可以不加
31 .Handler(new LoggingHandler("SRV-LSTN"))
32 //自定义初始化Tcp服务
33 .ChildHandler(new EchoServerInitializer());
34
35 //绑定服务端,端口号。IP地址默认读取项目配置文件。
36 await bootstrap.BindAsync(ServerSettings.Port);
37 }
- 第二步,初始化Channel相关处理类
1 /// <summary>
2 /// 初始化
3 /// </summary>
4 public class EchoServerInitializer : ChannelInitializer<ISocketChannel>
5 {
6 /// <summary>
7 /// No interaction time.300s
8 /// </summary>
9 public const int AllTimeOut = 60 * 5;
10
11 /// <summary>
12 /// Read Time Out.60s
13 /// </summary>
14 public const int ReadTimeOut = 60;
15
16 /// <summary>
17 /// Recive Time Out.60s
18 /// </summary>
19 public const int WriterTimeOut = 60;
20
21 protected override void InitChannel(ISocketChannel channel)
22 {
23 /*
24 * 工作线程连接器是设置了一个频道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输
25 * 同时所有出栈的消息 也要这个频道的所有处理器进行一步步处理
26 */
27 IChannelPipeline pipeline = channel.Pipeline;
28 //初始化Dotnetty日志拦截器
29 pipeline.AddLast(new LoggingHandler("SRV-CONN"));
30 //心跳超时时间配置
31 pipeline.AddLast(new IdleStateHandler(
32 ReadTimeOut,
33 WriterTimeOut,
34 AllTimeOut));
35 //消息内容编码逻辑处理类
36 pipeline.AddLast("encoder", new EncoderHandler());
37 //解码逻辑处理类
38 pipeline.AddLast("decoder", new DecoderHandler());
39 //心跳逻辑处理
40 pipeline.AddLast(new HeartBeatHandler());
41 //每个频道请求消息处理类
42 pipeline.AddLast(new ServerHandler());
43 }
44 }
- 第三步,配置、实现心跳处理机制
1 public class HeartBeatHandler : ChannelHandlerAdapter
2 {
3 /// <summary>
4 /// 每个频道都有自己的心跳管理,如果频道长时间不操作踢掉线的逻辑可以写在这里
5 /// </summary>
6 /// <param name="context"></param>
7 /// <param name="evt"></param>
8 public override void UserEventTriggered(IChannelHandlerContext context, object evt)
9 {
10 var eventState = evt as IdleStateEvent;
11 if (eventState != null)
12 {
13 String type = string.Empty;
14 if (eventState.State == IdleState.ReaderIdle)
15 {
16 type = "read idle";//没有任何接受
17 }
18 else if (eventState.State == IdleState.WriterIdle)
19 {
20 type = "write idle";//没有任何写入
21 }
22 else if (eventState.State == IdleState.AllIdle)
23 {
24 type = "all idle";
25 context.CloseAsync();//5分钟内无任何交互则断开该客户端连接
26 }
27 }
28 else
29 {
30 base.UserEventTriggered(context, evt);
31 }
32 }
33 }
- 第四步,编码、解码
1 /// <summary>
2 /// 解码
3 /// </summary>
4 public class DecoderHandler : ByteToMessageDecoder
5 {
6 protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
7 {
8 throw new NotImplementedException();
9 }
10 }
11
12
13
14 public class EncoderHandler : MessageToByteEncoder<byte[]>
15 {
16 /// <summary>
17 /// 编码
18 /// </summary>
19 /// <param name="context"></param>
20 /// <param name="message"></param>
21 /// <param name="output"></param>
22 protected override void Encode(IChannelHandlerContext context, byte[] message, IByteBuffer output)
23 {
24 throw new NotImplementedException();
25 }
26 }
- 第五步,Channel逻辑处理实现
1 public class ServerHandler : ChannelHandlerAdapter
2 {
3 /*
4 * Channel的生命周期
5 * 1.ChannelRegistered 先注册
6 * 2.ChannelActive 再被激活
7 * 3.ChannelRead 客户端与服务端建立连接之后的会话(数据交互)
8 * 4.ChannelReadComplete 读取客户端发送的消息完成之后
9 * error. ExceptionCaught 如果在会话过程当中出现dotnetty框架内部异常都会通过Caught方法返回给开发者
10 * 5.ChannelInactive 使当前频道处于未激活状态
11 * 6.ChannelUnregistered 取消注册
12 */
13
14 /// <summary>
15 /// 频道注册
16 /// </summary>
17 /// <param name="context"></param>
18 public override void ChannelRegistered(IChannelHandlerContext context)
19 {
20 base.ChannelRegistered(context);
21 }
22
23 /// <summary>
24 /// socket client 连接到服务端的时候channel被激活的回调函数
25 /// </summary>
26 /// <param name="context"></param>
27 public override void ChannelActive(IChannelHandlerContext context)
28 {
29 //一般可用来记录连接对象信息
30 base.ChannelActive(context);
31 }
32
33 /// <summary>
34 /// socket接收消息方法具体的实现
35 /// </summary>
36 /// <param name="context">当前频道的句柄,可使用发送和接收方法</param>
37 /// <param name="message">接收到的客户端发送的内容</param>
38 public override void ChannelRead(IChannelHandlerContext context, object message)
39 {
40 var buffer = message as IByteBuffer;
41 if (buffer != null)
42 {
43 Console.WriteLine("Received from client: " buffer.ToString(Encoding.UTF8));
44 }
45 context.WriteAsync(message);//发送给客户端方法
46 }
47
48 /// <summary>
49 /// 该次会话读取完成后回调函数
50 /// </summary>
51 /// <param name="context"></param>
52 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();//
53
54 /// <summary>
55 /// 异常捕获
56 /// </summary>
57 /// <param name="context"></param>
58 /// <param name="exception"></param>
59 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
60 {
61 Console.WriteLine("Exception: " exception);
62 context.CloseAsync();
63 }
64
65 /// <summary>
66 /// 当前频道未激活状态
67 /// </summary>
68 /// <param name="context"></param>
69 public override void ChannelInactive(IChannelHandlerContext context)
70 {
71 base.ChannelInactive(context);
72 }
73
74 /// <summary>
75 /// 取消注册当前频道,可理解为销毁当前频道
76 /// </summary>
77 /// <param name="context"></param>
78 public override void ChannelUnregistered(IChannelHandlerContext context)
79 {
80 base.ChannelUnregistered(context);
81 }
82 }