Netty简单介绍
还记得前面的文章「[
Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?](https://links.jianshu.com/go?to=https://mp.weixin.qq.com/s/vVyDrNaAF6QI64ZAQXVlyQ)」里我们所提到的吗?WebSocket本身只是一个应用层协议,原则上只要遵循这个协议的客户端/服务端均可使用。对于客户端,前面我们已明确采用OkHttp框架来实现了,而对于服务端,我们则计划采用Netty框架来实现。
Netty是什么?Netty是一款 异步的、基于事件驱动的网络应用程序框架 ,支持快速开发 可维护的、高性能的、面向协议的 服务端和客户端。
Netty封装了Java NIO
API的能力,把原本在高负载下繁琐且容易出错的I/O操作,隐藏在一个简单易用的API之下。这无疑对于缺少服务端编程经验的客户端开发人员是非常友好的,只要把Netty的几个核心组件弄明白了,快速搭设一个满足本项目演示需要的WebSocket服务器基本上没什么问题。
Netty核心组件
Channel
Channel是Netty传输API的核心,被用于所有的I/O操作,Channel 接口所提供的API大大降低了Java中直接使用Socket类的复杂性。
回调
Netty在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以交由一个ChannelHandler的实现处理。
Future
Future提供了一种在操作完成时通知应用程序的方式,可以看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。
Netty提供了自己的实现——ChannelFuture,由ChannelFutureListener提供的通知机制消除了手动检查对应操作是否完成的步骤。
事件和ChannelHandler
Netty使用不同的事件来通知我们状态的改变,这使得我们能够基于已经发生的事件来触发适当的动作。
每个事件都可以被分发给ChannelHandler类,ChannelHandler类中提供了自定义的业务逻辑,架构上有助于保持业务逻辑与网络处理代码的分离。
用IntelliJ IDEA运行Netty的WebSocket演示代码
众所周知,Android Studio是基于IntelliJ IDEA开发的,因此对于习惯了用Android
Studio进行开发的Android开发人员,用起IntelliJ
IDEA来也几乎没有任何障碍。本篇的目的是快速搭设WebSocket服务器,因此选择直接将Netty的WebSocket演示代码拉取下来运行。在确保项目能成功运行起来的基础上,再逐步去分析演示代码。
该演示代码展示的交互效果很简单,跟前面的官方测试服务器一样,当客户端向服务端发送一个消息,服务器都会将消息原原本本地回传给客户端(没错,又是Echo
Test。。。)。虽然看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。
接下来我们分别进行两端的工作:
服务端的工作:
- IntelliJ IDEA左上角New-Project-Maven创建新工程
- 拉取Netty的WebSocket演示代码到src目录下
- 按Alt Enter快捷键自动导入Netty依赖
- 运行WebSocketServer类的main()函数
当控制台输出输出语句,即表示WebSocket服务器成功运行在本机上了:
代码语言:txt复制Open your web browser and navigate to http://127.0.0.1:8080/
客户端的工作:
- 保证手机网络与服务端在同一局域网下
- 将要连接的WebSocket服务器地址更改为:ws://{服务端IP地址}:8080/websocket
- 正常发送消息
从控制台可以看到,客户端成功地与WebSocket服务器建立了连接,并在发送消息后成功收到了服务器的回传消息:
11.png
WebSocket演示代码分析
总的来说,Netty的WebSocket演示代码中包含了两部分核心工作,其分别的意义以及对应的类如下表所示:
核心工作 | 意义 | 对应的类 |
---|---|---|
提供ChannelHandler接口实现 | 服务器对从客户端接收的数据的业务逻辑处理 | WebSocketServerHandler |
ServerBootstrap实例创建 | 配置服务器的启动,将服务器绑定到它要监听连接请求的端口上 | WebSocketServer |
我们先来看看WebSocketServerHandler类核心工作的主要代码:
代码语言:txt复制public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
代码语言:txt复制 private WebSocketServerHandshaker handshaker;
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 /**
代码语言:txt复制 * 当有新的消息传入时都会回调
*
* @param ctx
* @param msg
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
代码语言:txt复制 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 // 握手
代码语言:txt复制 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
代码语言:txt复制 getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
代码语言:txt复制 handshaker = wsFactory.newHandshaker(req);
代码语言:txt复制 if (handshaker == null) {
代码语言:txt复制 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
代码语言:txt复制 } else {
代码语言:txt复制 handshaker.handshake(ctx.channel(), req);
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 // 对于文本帧和二进制数据帧,将数据简单地回送给了远程节点。
代码语言:txt复制 if (frame instanceof TextWebSocketFrame) {
代码语言:txt复制 // Echo the frame
代码语言:txt复制 ctx.write(frame.retain());
代码语言:txt复制 return;
代码语言:txt复制 }
代码语言:txt复制 if (frame instanceof BinaryWebSocketFrame) {
代码语言:txt复制 // Echo the frame
代码语言:txt复制 ctx.write(frame.retain());
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制}
如你所见,为了处理所有接收到的数据,我们重写了WebSocketServerHandler类的channelRead()方法,重写的方法中主要处理了Http请求和WebSocket帧两种类型的数据。
Http请求类型的数据主要是为了处理客户端的握手建立连接过程,详情可参考前面的文章「[
Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?](https://links.jianshu.com/go?to=https://mp.weixin.qq.com/s/vVyDrNaAF6QI64ZAQXVlyQ)」,这里就不再展开讲了。
而WebSocket帧类型的数据主要是为了处理来自客户端主动发送的消息,我们知道,当WebSocket连接建立之后,后续的数据都是以帧的形式发送。主要包含以下几种类型的帧:
- 文本帧
- 二进制帧
- Ping帧
- Pong帧
- 关闭帧
其中,文本帧与二进制帧同属于消息帧,Ping帧和Ping帧主要用于连接保活,关闭帧则用于关闭连接,我们这里主要关心对消息帧的处理,可以看到,我们只是将数据简单回传回了远端节点,从而实现Echo
Test。
然后,我们再回过头来看WebSocketServer类的核心工作的主要代码:
代码语言:txt复制ublic final class WebSocketServer {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
代码语言:txt复制 public static void main(String[] args) throws Exception {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
代码语言:txt复制 EventLoopGroup workerGroup = new NioEventLoopGroup();
代码语言:txt复制 try {
代码语言:txt复制 ServerBootstrap b = new ServerBootstrap();
代码语言:txt复制 b.group(bossGroup, workerGroup)
代码语言:txt复制 .channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel
代码语言:txt复制 .childHandler(new WebSocketServerInitializer(sslCtx));
代码语言:txt复制 // 使用指定的端口,异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
代码语言:txt复制 Channel ch = b.bind(PORT).sync().channel();
代码语言:txt复制 System.out.println("Open your web browser and navigate to "
代码语言:txt复制 (SSL? "https" : "http") "://127.0.0.1:" PORT '/');
代码语言:txt复制 // 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
代码语言:txt复制 ch.closeFuture().sync();
代码语言:txt复制 } finally {
代码语言:txt复制 // 关闭EventLoopGroup,释放所有的资源
代码语言:txt复制 bossGroup.shutdownGracefully();
代码语言:txt复制 workerGroup.shutdownGracefully();
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制}
我们使用ServerBootstrap引导类来完成Websocket服务器的网络层配置,随后调用bind(int
inetPort)方法将进程绑定到某个指定的端口,此过程称之为引导服务器。
我们是如何将前面定义的WebSocketServerHandler与ServerBootstrap关联起来的呢?关键就在于childHandler(ChannelHandler
childHandler)方法。
每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链。我们需要提供一个ChannelInitializer的实现,并在其initChannel()回调方法中,将包括WebSocketServerHandler在内的一组自定义的ChannelHandler安装到ChannelPipeline中:
代码语言:txt复制public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 public WebSocketServerInitializer(SslContext sslCtx) {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 }
代码语言:txt复制 @Override
代码语言:txt复制 public void initChannel(SocketChannel ch) throws Exception {
代码语言:txt复制 ChannelPipeline pipeline = ch.pipeline();
代码语言:txt复制 if (sslCtx != null) {
代码语言:txt复制 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
代码语言:txt复制 }
代码语言:txt复制 pipeline.addLast(new HttpServerCodec());
代码语言:txt复制 pipeline.addLast(new HttpObjectAggregator(65536));
代码语言:txt复制 pipeline.addLast(new WebSocketServerHandler());
代码语言:txt复制 }
代码语言:txt复制}
将Echo形式改为Broadcast形式
我们之前讲过,现今主流的IM应用几乎都是采用服务器中转的方式来进行消息传输的,为了更好地实践这种设计,我们进一步来对WebSocket服务器进行改造,把Echo形式改为Broadcast形式,即:
当接收到某一客户端的一条消息之后,将该消息转发给服务端维护的、除发送方之外的其他客户端连接。
要实现这一功能我们需要用到ChannelGroup类,ChannelGroup负责跟踪所有活跃中的WebSocket连接,当有新的客户端通过握手成功建立连接后,我们就要把这个新的Channel添加到ChannelGroup中去。
当接收到了WebSocket消息帧数据后,就调用ChannelGroup的writeAndFlush()方法将消息传输给所有已经连接的WebSocket
Channel。
ChannelGroup还允许传递过滤参数,我们可以以此过滤掉发送方的Channel。
代码语言:txt复制public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 private final ChannelGroup group;
代码语言:txt复制 public WebSocketServerHandler(ChannelGroup group) {
代码语言:txt复制 this.group = group;
代码语言:txt复制 }
代码语言:txt复制 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
代码语言:txt复制 // ...省去其他代码
代码语言:txt复制 if (frame instanceof TextWebSocketFrame) {
代码语言:txt复制// ctx.write(frame.retain());
代码语言:txt复制 group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
代码语言:txt复制 return;
代码语言:txt复制 }
代码语言:txt复制 if (frame instanceof BinaryWebSocketFrame) {
代码语言:txt复制// ctx.write(frame.retain());
代码语言:txt复制 group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 @Override
代码语言:txt复制 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
代码语言:txt复制 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
代码语言:txt复制 // 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
代码语言:txt复制 group.add(ctx.channel());
代码语言:txt复制 } else {
代码语言:txt复制 super.userEventTriggered(ctx, evt);
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制}
运行起来之后,让多个客户端连接到此服务器,当客户端中的一个发送了一条消息后,其他连接的客户端会收到由服务器广播的这一条消息: