相关推荐:
【Flink】第十五篇:Redis Connector 数据保序思考
【Flink】第十八篇:Direct Memory 一箩筐
【Flink】第三十篇:Netty 之 Java NIO
上一篇【Flink】第三十篇:Netty 之 Java NIO 为大家分享了IO 中的基本概念、5种 IO 模型、IO多路复用、Reactor IO设计模式。
本篇将介绍Netty的
- 设计思想
- 核心抽象
- IO线程模型等
在上一篇中,我们介绍了在BIO阶段,IO由单线程演进到多线程,但本质都是阻塞的socket模式,
单线程:
这段代码片段将只能同时处理一个连接,要管理多个并发客户端,需要为每个新的客户端Socket 创建一个新的Thread。
多线程:
Java NIO:
Java NIO 很早就提供了非阻塞调用,可以使用操作系统的事件通知API注册一组非阻塞套接字,以确定它们中是否有任何的套接字已经有数据可供读写。(也称为I/O多路复用,该接口从最初的select和poll调用到更加高性能的实现epoll)
java.nio.channels.Selector 是Java 的非阻塞I/O 实现的关键。它使用了事件通知的方式以确定在一组非阻塞套接字中有哪些已经就绪能够进行I/O 相关的操作。因为可以在任何的时间检查任意的读操作或者写操作的完成状态
Netty是一个异步、事件驱动的网络编程框架,为了快速发展的可维护、高性能的C/S协议。
利用Java的高级网络功能,隐藏其背后的复杂性而提供一个易于使用的客户端/服务器框架。
核心设计:异步事件驱动
思想:
Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture。ChannelFuture 的 addListener 方法注册了一个 ChannelFutureListener,当操作完成时,ChannelFutureListener可以通过 回调 的方式被异步通知。
阐述:
主调线程执行非阻塞方法,方法立即返回一个Future。然后在这个Futrue上注册一个Listener。主调线程便可以继续执行其他事情了。
当这个非阻塞方法真正执行完毕,会引起一些状态的变化,状态变化触发Futrue的一个特定事件,这个事件会使得通知注册的Listener,然后包装Listener中的operationComplete方法为一个Runable,扔到线程池里去执行这个函数。
源码层面的理解:
Futrue本质是一个被观察者,Listener是观察者,Futrue上注册若干Listener,当发生特定Event,会触发Futrue上特定的Listener。被观察者Future调用观察者Listener的operationComplete方法实现时间通知和未来逻辑的执行。
代码语言:javascript复制private void notifyListeners() {
// EventExecutor就是一个线程池
EventExecutor executor = executor();
//如果当前线程是EventExecutor中的线程,直接执行
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//如果当前线程不是EventExecutor中的线程,则放入EventExecutor中执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
EventExecutor本质就是一个线程池:
EventExecutor其实就是一个只有一个Thread的线程池,包含在线程池组中EventExecutorGroup。netty的Futrue继承了jdk的Futrue,netty的EventListener继承了jdk的EventListener,通过EventListener的operationComplete回调实现异步事件通知的原理是:
一个Future代表一个Channel的某个执行结果,可以添加若干个Listener,当有了执行结果,会发出相应事件,事件触发Future去通知注册在相应事件下的若干Listener,并且如果当前线程如果就是Channel的这个EventExecutor(调用inEventloop可以得知),则立即执行,否则放入EventExecutor的任务队列等待执行。也就是说Listener的异步逻辑还是在Channel对应的那个EventExecutor的线程中执行的。这是Netty的异步事件驱动的源码层理解。
核心抽象——Bootstrap
Bootstrap(启动socket)
一个引导程序,引导Channel供使用。Netty有两种类型的引导:
- 客户端(Bootstrap)
- 服务端(ServerBootstrap)
核心抽象——事件循环组(线程模型)
1. EventExecutorGroup
代码语言:javascript复制public interface EventExecutorGroup
extends ScheduledExecutorService, Iterable<EventExecutor>
事件执行器组:通过next()提供下一个EventExecutor。除此之外,还负责EventExecutor的生命周期,例如全局性的关闭。
2. EventExecutor
代码语言:javascript复制public interface EventExecutor extends EventExecutorGroup
事件执行器:是一个特殊的时间执行器组,提供了判断一个线程是否属于这个事件循环。除此之外它还扩展了事件执行器组,允许以一种通用的方式访问。
3. EventLoopGroup
代码语言:javascript复制public interface EventLoopGroup extends EventExecutorGroup
事件循环组:一个EventLoopGroup包含若干EventLoop,允许注册Channel。提供了一种迭代,用于在事件循环中检索下一个要处理的Channel。
4. EventLoop
代码语言:javascript复制public interface EventLoop extends OrderedEventExecutor, EventLoopGroup
事件循环:一个EventLoop被注册了若干Channel,并处理这些Channel的所有IO事件。
- 一个EventLoopGroup包含若干EventLoop
- 一个EventLoop在其生命周期内只能和一个Thread绑定,EventLoop处理的I/O事件都由它绑定的Thread处理
核心抽象——Future(异步通知)
1. Future
代码语言:javascript复制public interface Future<V> extends java.util.concurrent.Future<V>
异步操作的结果。Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。JDK预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。
2. ChannelFuture
代码语言:javascript复制public interface ChannelFuture extends Future<Void>
Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture。ChannelFuture不是未完成就是已完成。它的 addListener 方法注册了一个 ChannelFutureListener,当操作完成时,可以被异步通知(不管成功与否)。
代码语言:javascript复制{@code @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
);
3. ChannelFutureListener
代码语言:javascript复制public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture>
监听一个ChannelFuture的结果。通过调用addListener(GenericFutureListener)添加一个Listener,这个异步的IO操作结果将会被通知一次这个Listener。
迅速返回调用者控制权:operationComplete(Future)会直接被一个IO线程调用。因此,在IO期间,执行一个耗时的或者阻塞的operationComplete会发生意料之外的事情。如果需要执行一个耗时的操作请在一个不同的线程池里执行。
核心抽象——Channel(通道)
与网络套接字或能够进行I/O操作(例如读取,写入,连接和绑定)的组件。与Channel相关的概念有以下四个:
- Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
- ChannelHandler,核心处理业务就在这里,用于处理业务请求。
- ChannelHandlerContext,用于传输业务数据。
- ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
核心抽象——ChannelHandler(处理程序)
1. ChannelHandler
处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline中的下一个处理程序。
2. Inbound/Outbound子类
ChannelHandler本身不提供许多方法,但是通常必须实现其子类型之一:
- ChannelInboundHandler处理入站I/O事件。在状态改变上添加回调函数。在状态改变时调用用户添加的钩子函数。
- ChannelOutboundHandler处理出站I/O操作。会得到IO出站操作的通知。
3. ChannelHandlerContext
ChannelHandler随ChannelHandlerContext对象一起提供。ChannelHandler应该通过上下文对象与其所属的ChannelPipeline进行交互。使用上下文对象,ChannelHandler可以在上游或下游传递事件,动态修改管道或存储特定Handler处理程序的信息(使用AttributeKeys)。
核心抽象——ChannelPipline(管道)
核心抽象——ByteBuf(字节容器)
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:
他有三种使用模式:
1)Heap Buffer 堆缓冲区
堆缓冲区是ByteBuf最常用的模式,他将数据存储在堆空间。
2)Direct Buffer 直接缓冲区
直接缓冲区是ByteBuf的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4引入的nio的ByteBuffer类允许jvm通过本地方法调用分配内存,这样做有两个好处
a) 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
b) DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.
3)Composite Buffer 复合缓冲区
复合缓冲区相当于多个不同ByteBuf的视图,这是netty提供的,jdk不提供这样的功能。
关系
- 一个EventLoopGroup包含若干EventLoop
- 一个EventLoop在其生命周期内只能和一个Thread绑定,EventLoop处理的I/O事件都由它绑定的Thread处理
- 一个Channel在其生命周期内,只能注册于一个EventLoop,一个EventLoop可能被分配处理多个Channel。也就是EventLoop与Channel是1 : n的关系
- 一个Channel上的所有ChannelHandler的事件由绑定的EventLoop中的I/O线程处理
- 不要阻塞Channel的I/O线程,可能会影响该EventLoop中其他Channel事件处理
线程模型
运行任务来处理在连接的生命周期内发生的事件是任何网络框架的基本功能。与之相应的编程上的构造通常被称为事件循环,Netty 使用了io.netty.channel.EventLoop 来抽象。
Netty的EventLoop 是协同设计的一部分,它采用了两个基本的API:并发和网络编程。首先,io.netty.util.concurrent 包构建在JDK 的java.util.concurrent 包上,用来提供线程执行器。其次,io.netty.channel 包中的类,为了与Channel 的事件进行交互,扩展了这些接口/类。
在这个模型中,一个EventLoop 将由一个永远都不会改变的Thread驱动,同时任务(Runnable 或者Callable)可以直接提交给EventLoop 实现,以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个EventLoop 实例用以优化资源的使用,并且单个EventLoop 可能会被指派用于服务多个Channel。
Netty线程模型的卓越性能取决于对于当前执行的Thread的身份的确定(通过调用EventLoop 的inEventLoop(Thread)方法实现),也就是说,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。
如果(当前)调用线程正是支撑EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。这也就解释了任何的Thread 是如何与Channel 直接交互而无需在ChannelHandler 中进行额外同步的。注意,每个EventLoop 都有它自已的任务队列,独立于任何其他的EventLoop。
异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel 所共享。这使得可以通过尽可能少量的Thread 来支撑大量的Channel,而不是每个Channel 分配一个Thread。
EventLoopGroup 负责为每个新创建的Channel 分配一个EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的EventLoop可能会被分配给多个Channel。
一旦一个Channel 被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread)。这可以使你从担忧你的ChannelHandler 实现中的线程安全和同步问题中解脱出来。
总结
如果让我用几个关键词描述Netty,我的答案是:
- IO多路复用(Java NIO )
- mmap零拷贝(Java NIO )
- Futrue(非阻塞返回结果)
- 回调函数(异步通知)
- Reactor模式(本质是观察者设计模式)
- 单线程线程池(EventLoop与Thread对等绑定)
- 责任链模式(Handler 入站出站处理IO事件)