Tomcat中的WebSocket是如何实现的?

2024-08-12 09:41:27 浏览数 (1)

Tomcat中的WebSocket是如何实现的?

WebSocket是一种在客户端和服务器之间提供长期、双向、实时通信的协议

全双工通信:WebSocket允许数据同时在客户端和服务器双向通信,无需像HTTP等待请求和响应的循环

单个TCP连接:建立一次连接后,双方可在持久连接上交换任意数量的数据包,减少网络延迟、资源消耗

升级协议:WebSocket连接初始化时,通过HTTP协议进行一次握手,之后便升级到WebSocket协议进行数据传输

事件驱动:WebSocket通信基于事件,如 OnOpenOnMessageOnClose

WebSocket快速入门

SrpingBoot项目整合WebSocket

导入maven依赖

代码语言:xml复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-WebSocket</artifactId>
    <version>3.0.4</version>
</dependency>

新建一个配置类 @Configuration,将 ServerEndpointExporter 加入容器

代码语言:java复制
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

新建一个类,使用注解@ServerEndpoint标识路径,并使用注解@Component加入容器

有一系列@OnXX的注解可以标识在方法上,表示当遇到XX情况时调用对应的方法

@OnOpen 建立连接、@OnClose 关闭连接、@OnMessage 收到消息、@OnError 出现错误

代码语言:java复制
@ServerEndpoint("/ws/{id}")
@Component
public class WebSocketServer {

    private static final Map<Long, Session> map = new ConcurrentHashMap<>();

    @OnOpen
    public void open(@PathParam("id") Long id, Session session) {
        map.put(id, session);
        System.out.println(id   " 建立连接");
    }

    @OnClose
    public void close(@PathParam("id") Long id) {
        map.remove(id);
        System.out.println(id   " 关闭连接");
    }

    @OnMessage
    public void msg(String msg, Session session) {
        session.getAsyncRemote().sendText("收到消息:"   msg);
    }

    @OnError
    public void error(String error) {
        System.out.println(error);
    }

}

注意:open、msg方法中的入参Session是WebSocket中的,而不是servlet规范的

配置的端口为8080,context path为/caicai

代码语言:yaml复制
server:
  port: 8080
  servlet:
    context-path: /caicai

接下来就可以开始测试了,使用ApiFox工具建立WebSocket连接,发送消息111,最终会调用msg方法发送:收到消息:111

image.pngimage.png

WebSocket原理

我们在配置类中将ServerEndpointExporter类加入容器

代码语言:java复制
@Bean
public ServerEndpointExporter serverEndpointExporter() {
    return new ServerEndpointExporter();
}

ServerEndpointExporter实现SmartInitializingSingleton接口,在容器初始化完Bean后,调用afterSingletonsInstantiated方法

代码语言:java复制
@Override
public void afterSingletonsInstantiated() {
    registerEndpoints();
}

也就是单例Bean实例化之后执行,会扫描容器中的WebSocket处理类并注册

代码语言:java复制
protected void registerEndpoints() {
    //收集WebSocket处理类
    Set<Class<?>> endpointClasses = new LinkedHashSet<>();
    if (this.annotatedEndpointClasses != null) {
       endpointClasses.addAll(this.annotatedEndpointClasses);
    }

    ApplicationContext context = getApplicationContext();
    if (context != null) {
       //从容器中找到@ServerEndpoint注解标识的WebSocket处理类
       String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
       for (String beanName : endpointBeanNames) {
          endpointClasses.add(context.getType(beanName));
       }
    }
	//注册
    for (Class<?> endpointClass : endpointClasses) {
       registerEndpoint(endpointClass);
    }

    if (context != null) {
       Map<String, ServerEndpointConfig> endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
       for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
          registerEndpoint(endpointConfig);
       }
    }
}

最终加入WebSocketContainer容器(ServerContainer extends WebSocketContainer)

代码语言:java复制
private void registerEndpoint(Class<?> endpointClass) {
    ServerContainer serverContainer = getServerContainer();
    Assert.state(serverContainer != null,
          "No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer "  
          "has not run yet. Was the Spring ApplicationContext refreshed through a "  
          "org.springframework.web.context.ContextLoaderListener, "  
          "i.e. after the ServletContext has been fully initialized?");
    try {
       if (logger.isDebugEnabled()) {
          logger.debug("Registering @ServerEndpoint class: "   endpointClass);
       }
       //加入容器 
       serverContainer.addEndpoint(endpointClass);
    }
    catch (DeploymentException ex) {
       throw new IllegalStateException("Failed to register @ServerEndpoint class: "   endpointClass, ex);
    }
}

ServerEndpointExporter在容器启用时,扫描容器中被@ServerEndpoint标识的WebSocket处理类并注册

在前文曾说过:请求由EndPoint进行网络通信,当处理完网络通信封装成SocketProcessorBase交给线程池进行执行,会先调用Http11Processor解析再调用Adapter适配器交给容器处理

image.pngimage.png

作为升级协议的WebSocket前面网络通信流程不变,而调用Processor时会使用UpgradeProcessorInternal

UpgradeProcessorInternal最终会找到WebSocketContainer容器中对应的WebSocket处理类对应的方法进行调用(不会打到Container容器)

image.pngimage.png

总结

WebSocket是一种长期、双向、实时通信的协议,基于HTTP协议后升级为WebSocket协议

Tomcat在处理WebSocket时与HTTP请求有所不同,处理网络通信依旧还是使用EndPoint

当请求为HTTP时会使用Http11Processor接卸请求,经过适配器最终交给Container容器处理;当请求为WebSocket时使用UpgradeProcessorInternal,路由到WebSocketContainer容器中的ServerEndPoint处理类进行处理

ServerEndpointExporter实现SmartInitializingSingleton接口,在bean实例化后找到容器中被注解ServerEndPoint标识的处理类加入WebSocketContainer容器

0 人点赞