理解Netty4线程模型-线程封闭技术实现的无锁化设计,单线程不要阻塞和无序事件如何处理(rocketmq源码分析无序事件处理)

2023-11-03 15:53:41 浏览数 (1)

简介


Netty4的主从多线程模型是全局多线程,局部单线程(事件循环),用线程封闭技术实现的无锁化设计实现并发安全

一图理解Netty4线程模型


以使用最多的主、从多线程为例

代码语言:javascript复制
 EventLoopGroup mainGroup = new NioEventLoopGroup(1);
 EventLoopGroup childGroup = new NioEventLoopGroup(10);
代码语言:javascript复制

1、有两个主要的线程池,分别为boss线程池和worker线程池boss线程池主要处理客户端的连接事件,创建客户端连接并注册给worker线程池中某个线程。worker线程池主要处理客户端连接的读写事件

2、线程池抽象为NioEventLoopGroup,其中的每个线程抽象为NioEventLoop。

3、每个NioEventLoop里包含了一个Selector(注册并轮询socket事件)、一个

Thread(异步事件轮询处理线程)、一个队列tailTasks(事件循环最后需要执行的任务)、一个队列taskQueue(任务队列,排队执行任务)、一个scheduledTaskQueue(任务调度)。

4、每个客户端Channel都绑定到一个NioEventLoop(线程),每个NioEventLoop可被多个Channel绑定(即由绑定的NioEventLoop异步使用Selector轮询客户端的读写事件),可以使客户端Channel的事件处理单线程串行化无并发执行(线程封闭技术实现的无锁化设计)。

5、NioEventLoop的选择默认使用轮询调度(round-robin)策略

6、boss线程接受客户端连接后,会注册到worker线程中处理网络事件(放到任务队列,排队等待执行)。

代码语言:javascript复制
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

7、worker线程NioEventLoop负责网络包的编码解码及业务的执行(由ChannelHandler组成的ChannelPipeline职责链模式处理),不过每个

ChannelHandler可以由不同的线程池来处理(注意线程池的选择,请参考10)。

代码语言:javascript复制
 ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
 ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
 ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

8、Netty的单线程串行化使得网络事件的处理有序性,不会乱序。

9、切勿阻塞线程(单线程执行,发生阻塞影响后续业务处理,redis的单线程模型也是不能阻塞的),如果必须做阻塞调用或执行时间很长的任务,需要提交到其它线程池异步执行,此时网络事件的处理可能不会有序,需要业务方负责(注意线程池的选择,请参考10)。

10、异步线程池的选择很重要,会影响网络事件的有序性,一旦无序处理,需要业务方自己处理

(一般原则:排队,单线程执行 )

(一般原则:排队,多线程执行)

附:rocketmq 5.0.0 无序网络事件的处理:

rocketmq Broker端使用线程池隔离策略来处理不同的业务请求:

代码语言:javascript复制
org.apache.rocketmq.broker.BrokerController

自定义线程池处理Netty的网络事件,脱离了Netty线程模型的有序性,rocketmq自身来处理网络事件的有序性:

1、每个请求都会携带唯一请求ID:

代码语言:javascript复制
org.apache.rocketmq.remoting.protocol.RemotingCommand

2、每次请求都会保存请求ID到响应的关系:

代码语言:javascript复制
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

(1)使用ConcurrentMap来保存请求ID到响应的关系:

(2)发出请求时,获取分配的请求id,并构建响应ResponseFuture,保存其关系,等待响应返回:

3、等待响应(服务端会把请求id携带回来):

4、当客户端处理服务端的响应时,会根据返回的请求id,找到其之前保存的ResponseFuture,并设置ResponseFuture中的响应体,唤醒等待线程:

代码语言:javascript复制
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand

0 人点赞