使用Netty框架搭建WebSocket服务器

2021-12-15 15:20:54 浏览数 (1)

Netty简单介绍

还记得前面的文章「[

Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?](https://links.jianshu.com/go?to=https://mp.weixin.qq.com/s/vVyDrNaAF6QI64ZAQXVlyQ)」里我们所提到的吗?WebSocket本身只是一个应用层协议,原则上只要遵循这个协议的客户端/服务端均可使用。对于客户端,前面我们已明确采用OkHttp框架来实现了,而对于服务端,我们则计划采用Netty框架来实现。

Netty是什么?Netty是一款 异步的、基于事件驱动的网络应用程序框架 ,支持快速开发 可维护的、高性能的、面向协议的 服务端和客户端。

Netty封装了Java NIO

API的能力,把原本在高负载下繁琐且容易出错的I/O操作,隐藏在一个简单易用的API之下。这无疑对于缺少服务端编程经验的客户端开发人员是非常友好的,只要把Netty的几个核心组件弄明白了,快速搭设一个满足本项目演示需要的WebSocket服务器基本上没什么问题。

Netty核心组件

Channel

Channel是Netty传输API的核心,被用于所有的I/O操作,Channel 接口所提供的API大大降低了Java中直接使用Socket类的复杂性。

回调

Netty在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以交由一个ChannelHandler的实现处理。

Future

Future提供了一种在操作完成时通知应用程序的方式,可以看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。

Netty提供了自己的实现——ChannelFuture,由ChannelFutureListener提供的通知机制消除了手动检查对应操作是否完成的步骤。

事件和ChannelHandler

Netty使用不同的事件来通知我们状态的改变,这使得我们能够基于已经发生的事件来触发适当的动作。

每个事件都可以被分发给ChannelHandler类,ChannelHandler类中提供了自定义的业务逻辑,架构上有助于保持业务逻辑与网络处理代码的分离。

用IntelliJ IDEA运行Netty的WebSocket演示代码

众所周知,Android Studio是基于IntelliJ IDEA开发的,因此对于习惯了用Android

Studio进行开发的Android开发人员,用起IntelliJ

IDEA来也几乎没有任何障碍。本篇的目的是快速搭设WebSocket服务器,因此选择直接将Netty的WebSocket演示代码拉取下来运行。在确保项目能成功运行起来的基础上,再逐步去分析演示代码。

该演示代码展示的交互效果很简单,跟前面的官方测试服务器一样,当客户端向服务端发送一个消息,服务器都会将消息原原本本地回传给客户端(没错,又是Echo

Test。。。)。虽然看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。

接下来我们分别进行两端的工作:

服务端的工作:
  • IntelliJ IDEA左上角New-Project-Maven创建新工程
  • 拉取Netty的WebSocket演示代码到src目录下
  • 按Alt Enter快捷键自动导入Netty依赖
  • 运行WebSocketServer类的main()函数

当控制台输出输出语句,即表示WebSocket服务器成功运行在本机上了:

代码语言:txt复制
Open your web browser and navigate to http://127.0.0.1:8080/
客户端的工作:
  • 保证手机网络与服务端在同一局域网下
  • 将要连接的WebSocket服务器地址更改为:ws://{服务端IP地址}:8080/websocket
  • 正常发送消息

从控制台可以看到,客户端成功地与WebSocket服务器建立了连接,并在发送消息后成功收到了服务器的回传消息:

11.png

WebSocket演示代码分析

总的来说,Netty的WebSocket演示代码中包含了两部分核心工作,其分别的意义以及对应的类如下表所示:

核心工作

意义

对应的类

提供ChannelHandler接口实现

服务器对从客户端接收的数据的业务逻辑处理

WebSocketServerHandler

ServerBootstrap实例创建

配置服务器的启动,将服务器绑定到它要监听连接请求的端口上

WebSocketServer

我们先来看看WebSocketServerHandler类核心工作的主要代码:

代码语言:txt复制
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
代码语言:txt复制
    private WebSocketServerHandshaker handshaker;
代码语言:txt复制
    // ...省去其他代码
代码语言:txt复制
    /**
代码语言:txt复制
     * 当有新的消息传入时都会回调
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
代码语言:txt复制
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
代码语言:txt复制
        // ...省去其他代码
代码语言:txt复制
        // 握手
代码语言:txt复制
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
代码语言:txt复制
                getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
代码语言:txt复制
        handshaker = wsFactory.newHandshaker(req);
代码语言:txt复制
        if (handshaker == null) {
代码语言:txt复制
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
代码语言:txt复制
        } else {
代码语言:txt复制
            handshaker.handshake(ctx.channel(), req);
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
代码语言:txt复制
        // ...省去其他代码
代码语言:txt复制
        // 对于文本帧和二进制数据帧,将数据简单地回送给了远程节点。
代码语言:txt复制
        if (frame instanceof TextWebSocketFrame) {
代码语言:txt复制
            // Echo the frame
代码语言:txt复制
            ctx.write(frame.retain());
代码语言:txt复制
            return;
代码语言:txt复制
        }
代码语言:txt复制
        if (frame instanceof BinaryWebSocketFrame) {
代码语言:txt复制
            // Echo the frame
代码语言:txt复制
            ctx.write(frame.retain());
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
    // ...省去其他代码
代码语言:txt复制
}

如你所见,为了处理所有接收到的数据,我们重写了WebSocketServerHandler类的channelRead()方法,重写的方法中主要处理了Http请求和WebSocket帧两种类型的数据。

Http请求类型的数据主要是为了处理客户端的握手建立连接过程,详情可参考前面的文章「[

Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?](https://links.jianshu.com/go?to=https://mp.weixin.qq.com/s/vVyDrNaAF6QI64ZAQXVlyQ)」,这里就不再展开讲了。

而WebSocket帧类型的数据主要是为了处理来自客户端主动发送的消息,我们知道,当WebSocket连接建立之后,后续的数据都是以帧的形式发送。主要包含以下几种类型的帧:

  • 文本帧
  • 二进制帧
  • Ping帧
  • Pong帧
  • 关闭帧

其中,文本帧与二进制帧同属于消息帧,Ping帧和Ping帧主要用于连接保活,关闭帧则用于关闭连接,我们这里主要关心对消息帧的处理,可以看到,我们只是将数据简单回传回了远端节点,从而实现Echo

Test。

然后,我们再回过头来看WebSocketServer类的核心工作的主要代码:

代码语言:txt复制
ublic final class WebSocketServer {
代码语言:txt复制
    // ...省去其他代码
代码语言:txt复制
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
代码语言:txt复制
    public static void main(String[] args) throws Exception {
代码语言:txt复制
        // ...省去其他代码
代码语言:txt复制
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
代码语言:txt复制
        EventLoopGroup workerGroup = new NioEventLoopGroup();
代码语言:txt复制
        try {
代码语言:txt复制
            ServerBootstrap b = new ServerBootstrap();
代码语言:txt复制
            b.group(bossGroup, workerGroup)
代码语言:txt复制
             .channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel
代码语言:txt复制
             .childHandler(new WebSocketServerInitializer(sslCtx));
代码语言:txt复制
            // 使用指定的端口,异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
代码语言:txt复制
            Channel ch = b.bind(PORT).sync().channel();
代码语言:txt复制
            System.out.println("Open your web browser and navigate to "  
代码语言:txt复制
                    (SSL? "https" : "http")   "://127.0.0.1:"   PORT   '/');
代码语言:txt复制
            // 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
代码语言:txt复制
            ch.closeFuture().sync();
代码语言:txt复制
        } finally {
代码语言:txt复制
            // 关闭EventLoopGroup,释放所有的资源
代码语言:txt复制
            bossGroup.shutdownGracefully();
代码语言:txt复制
            workerGroup.shutdownGracefully();
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
}

我们使用ServerBootstrap引导类来完成Websocket服务器的网络层配置,随后调用bind(int

inetPort)方法将进程绑定到某个指定的端口,此过程称之为引导服务器。

我们是如何将前面定义的WebSocketServerHandler与ServerBootstrap关联起来的呢?关键就在于childHandler(ChannelHandler

childHandler)方法。

每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链。我们需要提供一个ChannelInitializer的实现,并在其initChannel()回调方法中,将包括WebSocketServerHandler在内的一组自定义的ChannelHandler安装到ChannelPipeline中:

代码语言:txt复制
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
代码语言:txt复制
    // ...省去其他代码
代码语言:txt复制
    public WebSocketServerInitializer(SslContext sslCtx) {
代码语言:txt复制
        // ...省去其他代码
代码语言:txt复制
    }
代码语言:txt复制
    @Override
代码语言:txt复制
    public void initChannel(SocketChannel ch) throws Exception {
代码语言:txt复制
        ChannelPipeline pipeline = ch.pipeline();
代码语言:txt复制
        if (sslCtx != null) {
代码语言:txt复制
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
代码语言:txt复制
        }
代码语言:txt复制
        pipeline.addLast(new HttpServerCodec());
代码语言:txt复制
        pipeline.addLast(new HttpObjectAggregator(65536));
代码语言:txt复制
        pipeline.addLast(new WebSocketServerHandler());
代码语言:txt复制
    }
代码语言:txt复制
}

将Echo形式改为Broadcast形式

我们之前讲过,现今主流的IM应用几乎都是采用服务器中转的方式来进行消息传输的,为了更好地实践这种设计,我们进一步来对WebSocket服务器进行改造,把Echo形式改为Broadcast形式,即:

当接收到某一客户端的一条消息之后,将该消息转发给服务端维护的、除发送方之外的其他客户端连接。

要实现这一功能我们需要用到ChannelGroup类,ChannelGroup负责跟踪所有活跃中的WebSocket连接,当有新的客户端通过握手成功建立连接后,我们就要把这个新的Channel添加到ChannelGroup中去。

当接收到了WebSocket消息帧数据后,就调用ChannelGroup的writeAndFlush()方法将消息传输给所有已经连接的WebSocket

Channel。

ChannelGroup还允许传递过滤参数,我们可以以此过滤掉发送方的Channel。

代码语言:txt复制
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
代码语言:txt复制
    // ...省去其他代码
代码语言:txt复制
    private final ChannelGroup group;
代码语言:txt复制
    public WebSocketServerHandler(ChannelGroup group) {
代码语言:txt复制
        this.group = group;
代码语言:txt复制
    }
代码语言:txt复制
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
代码语言:txt复制
        // ...省去其他代码
代码语言:txt复制
        if (frame instanceof TextWebSocketFrame) {
代码语言:txt复制
//            ctx.write(frame.retain());
代码语言:txt复制
            group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
代码语言:txt复制
            return;
代码语言:txt复制
        }
代码语言:txt复制
        if (frame instanceof BinaryWebSocketFrame) {
代码语言:txt复制
//            ctx.write(frame.retain());
代码语言:txt复制
            group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
    @Override
代码语言:txt复制
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
代码语言:txt复制
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
代码语言:txt复制
            // 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
代码语言:txt复制
            group.add(ctx.channel());
代码语言:txt复制
        } else {
代码语言:txt复制
            super.userEventTriggered(ctx, evt);
代码语言:txt复制
        }
代码语言:txt复制
    }
代码语言:txt复制
}

运行起来之后,让多个客户端连接到此服务器,当客户端中的一个发送了一条消息后,其他连接的客户端会收到由服务器广播的这一条消息:

0 人点赞