简介
Netty4的主从多线程模型是全局多线程,局部单线程(事件循环),用线程封闭技术实现的无锁化设计实现并发安全。
一图理解Netty4线程模型
以使用最多的主、从多线程为例:
代码语言:javascript复制 EventLoopGroup mainGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup(10);
代码语言:javascript复制
1、有两个主要的线程池,分别为boss线程池和worker线程池。boss线程池主要处理客户端的连接事件,创建客户端连接并注册给worker线程池中某个线程。worker线程池主要处理客户端连接的读写事件。
2、线程池抽象为NioEventLoopGroup,其中的每个线程抽象为NioEventLoop。
3、每个NioEventLoop里包含了一个Selector(注册并轮询socket事件)、一个
Thread(异步事件轮询处理线程)、一个队列tailTasks(事件循环最后需要执行的任务)、一个队列taskQueue(任务队列,排队执行任务)、一个scheduledTaskQueue(任务调度)。
4、每个客户端Channel都绑定到一个NioEventLoop(线程),每个NioEventLoop可被多个Channel绑定(即由绑定的NioEventLoop异步使用Selector轮询客户端的读写事件),可以使客户端Channel的事件处理单线程串行化无并发执行(线程封闭技术实现的无锁化设计)。
5、NioEventLoop的选择默认使用轮询调度(round-robin)策略。
6、boss线程接受客户端连接后,会注册到worker线程中处理网络事件(放到任务队列,排队等待执行)。
代码语言:javascript复制io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
7、worker线程NioEventLoop负责网络包的编码解码及业务的执行(由ChannelHandler组成的ChannelPipeline职责链模式处理),不过每个
ChannelHandler可以由不同的线程池来处理(注意线程池的选择,请参考10)。
代码语言:javascript复制 ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
8、Netty的单线程串行化使得网络事件的处理有序性,不会乱序。
9、切勿阻塞线程(单线程执行,发生阻塞影响后续业务处理,redis的单线程模型也是不能阻塞的),如果必须做阻塞调用或执行时间很长的任务,需要提交到其它线程池异步执行,此时网络事件的处理可能不会有序,需要业务方负责(注意线程池的选择,请参考10)。
10、异步线程池的选择很重要,会影响网络事件的有序性,一旦无序处理,需要业务方自己处理:
(一般原则:排队,单线程执行 )
(一般原则:排队,多线程执行)
附:rocketmq 5.0.0 无序网络事件的处理:
rocketmq Broker端使用线程池隔离策略来处理不同的业务请求:
代码语言:javascript复制org.apache.rocketmq.broker.BrokerController
自定义线程池处理Netty的网络事件,脱离了Netty线程模型的有序性,rocketmq自身来处理网络事件的有序性:
1、每个请求都会携带唯一请求ID:
代码语言:javascript复制org.apache.rocketmq.remoting.protocol.RemotingCommand
2、每次请求都会保存请求ID到响应的关系:
代码语言:javascript复制org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
(1)使用ConcurrentMap来保存请求ID到响应的关系:
(2)发出请求时,获取分配的请求id,并构建响应ResponseFuture,保存其关系,等待响应返回:
3、等待响应(服务端会把请求id携带回来):
4、当客户端处理服务端的响应时,会根据返回的请求id,找到其之前保存的ResponseFuture,并设置ResponseFuture中的响应体,唤醒等待线程:
代码语言:javascript复制org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand