netty WebSocket客户端实践

2023-09-10 09:43:47 浏览数 (2)

在之前的Socket学习中,主要都是基于两个Socket客户端:WebSocketSocket.IO。在做测试的时候也是基于WebSocket消息的发送和接收为主要测试对象。但是对于超多Socket连接没有涉及。

在实践中会发现,这两个实现类都存在一个问题,为了维护1个Socket连接及其功能,通常需要创建多个线程。在计算机硬件资源有限的情况下,线程是稀缺资源,不仅仅是内存占用,也会增加CPU的负担。

之前解决这个问题的方案直接换成「Go」语言版本的Socket客户端。例如:/net/websocketgorilla/websocket

其实Java也有相对应的解决方案:「netty」。话不多说,上代码。

依赖

代码语言:javascript复制
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.85.Final</version>
</dependency>

netty WebSocket客户端

客户端主要的功能就是创建连接,然后使用一个事件处理线程池管理连接以及收发消息io.netty.channel.EventLoopGroup,然后使用一个io.netty.bootstrap.Bootstrap来作为引导程序。

代码语言:javascript复制
package com.funtester.socket.netty  
  
import com.funtester.frame.execute.ThreadPoolUtil  
import groovy.util.logging.Log4j2  
import io.netty.bootstrap.Bootstrap  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.channel.nio.NioEventLoopGroup  
import io.netty.channel.socket.SocketChannel  
import io.netty.channel.socket.nio.NioSocketChannel  
import io.netty.handler.codec.http.DefaultHttpHeaders  
import io.netty.handler.codec.http.HttpClientCodec  
import io.netty.handler.codec.http.HttpObjectAggregator  
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker  
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory  
import io.netty.handler.codec.http.websocketx.WebSocketVersion  
import io.netty.handler.stream.ChunkedWriteHandler  
import io.netty.util.concurrent.GlobalEventExecutor  
  
@Log4j2  
class WebSocketConnector {  
  
    static Bootstrap bootstrap = new Bootstrap()  
  
    /**  
     * 处理事件的线程池  
     */  
    static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))  
  
    static {  
        bootstrap.group(group).channel(NioSocketChannel.class)  
    }  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    
     static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    WebSocketClientHandshaker handShaker  
  
    ChannelPromise handshakeFuture  
  
    String host  
  
    int port  
  
    /**  
     * 网络通道  
     */  
    Channel channel  
  
    WebSocketIoHandler handler  
  
    /**  
     * WebSocket协议类型的模拟客户端连接器构造方法  
     *  
     * @param serverIp  
     * @param serverSocketPort  
     * @param group  
     */    WebSocketConnector(String host, int port) {  
        this.host = host  
        this.port = port  
        String URL = this.host   ":"   this.port   "/test"  
        URI uri = new URI(URL)  
        handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))  
        bootstrap.option(ChannelOption.TCP_NODELAY, true)  
                .option(ChannelOption.SO_TIMEOUT, true)  
                .option(ChannelOption.SO_BROADCAST, true)  
                .option(ChannelOption.SO_KEEPALIVE, true)  
                .handler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ChannelPipeline pipeline = ch.pipeline()  
                        pipeline.addLast(new HttpClientCodec())  
                        pipeline.addLast(new ChunkedWriteHandler())  
                        pipeline.addLast(new HttpObjectAggregator(1024 * 1024))  
                        pipeline.addLast(handler)  
                    }  
                })  
    }  
  
  
    /**  
     * 连接  
     */  
    void connect() {  
        try {  
            try {  
                ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()  
                this.channel = future.channel()  
                clients.add(channel)  
            } catch (e) {  
                log.error("创建channel失败", e)  
            }  
        } catch (Exception e) {  
            log.error("连接服务失败", e)  
        } finally {  
            this.handshakeFuture = handler.handshakeFuture()  
        }  
    }  
  
    /**  
     * 关闭  
     */  
    void close() {  
        this.channel.close()  
    }  
  
}

这里用到了一个保存现在的所有的活跃channel的类io.netty.channel.group.ChannelGroup,有点就是可以自动管理所有的channel,还能自动剔除已经关闭的channel

这里还有补充2个发送消息的方法:

代码语言:javascript复制
/**  
 * 发送文本消息  
 */  
void sendText(String msg) {  
    channel.writeAndFlush(new TextWebSocketFrame(msg))  
}  
  
/**  
 * 发送ping消息  
 */  
void ping() {  
    channel.writeAndFlush(new PingWebSocketFrame())  
}

消息处理器

这里需要处理的消息各种类型,继承io.netty.channel.SimpleChannelInboundHandler实现不同的方法即可。

这里有个泛型设置可以直接设置成不同的消息类型,例如io.netty.handler.codec.http.websocketx.WebSocketFrame及其子类,如果确定服务端发来消息的类型的话,可以更加省事儿。

代码语言:javascript复制
package com.funtester.socket.netty  
  
import groovy.util.logging.Log4j2  
import io.netty.channel.*  
import io.netty.channel.group.ChannelGroup  
import io.netty.channel.group.DefaultChannelGroup  
import io.netty.handler.codec.http.FullHttpResponse  
import io.netty.handler.codec.http.websocketx.*  
import io.netty.handler.timeout.IdleState  
import io.netty.handler.timeout.IdleStateEvent  
import io.netty.util.concurrent.GlobalEventExecutor  
/**  
 * WebSocket协议类型的模拟客户端IO处理器类  
 */  
@Log4j2  
class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {  
  
    /**  
     * 用于记录和管理所有客户端的channel  
     */    private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)  
  
    private final WebSocketClientHandshaker handShaker  
  
    private ChannelPromise handshakeFuture  
  
    WebSocketIoHandler(WebSocketClientHandshaker handShaker) {  
        this.handShaker = handShaker  
    }  
  
    ChannelFuture handshakeFuture() {  
        return handshakeFuture  
    }  
  
    @Override  
    void handlerAdded(ChannelHandlerContext ctx) {  
        handshakeFuture = ctx.newPromise()  
    }  
  
    @Override  
    void channelActive(ChannelHandlerContext ctx) {  
        handShaker.handshake(ctx.channel());  
    }  
  
    @Override  
    void channelInactive(ChannelHandlerContext ctx) {  
        ctx.close()  
        try {  
            super.channelInactive(ctx)  
        } catch (Exception e) {  
            log.error("channelInactive 异常.", e)  
        }  
        log.warn("WebSocket链路与服务器连接已断开.")  
    }  
  
    @Override  
    void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        Channel ch = ctx.channel()  
        if (!handShaker.isHandshakeComplete()) {  
            try {  
                handShaker.finishHandshake(ch, (FullHttpResponse) msg)  
                handshakeFuture.setSuccess()  
            } catch (WebSocketHandshakeException e) {  
                log.warn("WebSocket Client failed to connect",e)  
                handshakeFuture.setFailure(e)  
            }  
            return  
        }  
  
        WebSocketFrame frame = (WebSocketFrame) msg  
        if (frame instanceof TextWebSocketFrame) {  
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame  
            String s = textFrame.text()  
        } else if (frame instanceof CloseWebSocketFrame) {  
            log.info("WebSocket Client closing")  
            ch.close()  
        }  
    }  
  
    @Override  
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)  
        if (!handshakeFuture.isDone()) {  
            handshakeFuture.setFailure(cause)  
        }  
        ctx.close()  
        super.exceptionCaught(ctx, cause)  
    }  
  
    @Override  
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
        if (evt instanceof IdleStateEvent) {  
            IdleStateEvent event = (IdleStateEvent) evt  
            // 如果写通道处于空闲状态,就发送心跳命令  
            if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {  
                // 发送心跳数据  
                def channel = ctx.channel()  
                channel.writeAndFlush(new TextWebSocketFrame("dsf"))  
            }  
        } else {  
            super.userEventTriggered(ctx, evt)  
        }  
    }  
}

这里处理接收到消息的时候并没有选择保存消息的功能,因为netty WebSocket使用场景就是超大量(超过1w)连接。保留返回消息,进行业务验证通常不是这类测试场景的首要目的。所以以后等用到了再说吧。

后面会对比这3种Socket客户端包括Go语言两种Socket客户端在超大量连接方面的资源占用。

0 人点赞