Tomcat中的WebSocket是如何实现的?
WebSocket是一种在客户端和服务器之间提供长期、双向、实时通信的协议
全双工通信:WebSocket允许数据同时在客户端和服务器双向通信,无需像HTTP等待请求和响应的循环
单个TCP连接:建立一次连接后,双方可在持久连接上交换任意数量的数据包,减少网络延迟、资源消耗
升级协议:WebSocket连接初始化时,通过HTTP协议进行一次握手,之后便升级到WebSocket协议进行数据传输
事件驱动:WebSocket通信基于事件,如 OnOpen
、OnMessage
、OnClose
WebSocket快速入门
SrpingBoot项目整合WebSocket
导入maven依赖
代码语言:xml复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-WebSocket</artifactId>
<version>3.0.4</version>
</dependency>
新建一个配置类 @Configuration
,将 ServerEndpointExporter
加入容器
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
新建一个类,使用注解@ServerEndpoint
标识路径,并使用注解@Component
加入容器
有一系列@OnXX
的注解可以标识在方法上,表示当遇到XX情况时调用对应的方法
@OnOpen
建立连接、@OnClose
关闭连接、@OnMessage
收到消息、@OnError
出现错误
@ServerEndpoint("/ws/{id}")
@Component
public class WebSocketServer {
private static final Map<Long, Session> map = new ConcurrentHashMap<>();
@OnOpen
public void open(@PathParam("id") Long id, Session session) {
map.put(id, session);
System.out.println(id " 建立连接");
}
@OnClose
public void close(@PathParam("id") Long id) {
map.remove(id);
System.out.println(id " 关闭连接");
}
@OnMessage
public void msg(String msg, Session session) {
session.getAsyncRemote().sendText("收到消息:" msg);
}
@OnError
public void error(String error) {
System.out.println(error);
}
}
注意:open、msg方法中的入参Session是WebSocket中的,而不是servlet规范的
配置的端口为8080,context path为/caicai
代码语言:yaml复制server:
port: 8080
servlet:
context-path: /caicai
接下来就可以开始测试了,使用ApiFox工具建立WebSocket连接,发送消息111,最终会调用msg
方法发送:收到消息:111
WebSocket原理
我们在配置类中将ServerEndpointExporter
类加入容器
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
ServerEndpointExporter实现SmartInitializingSingleton接口,在容器初始化完Bean后,调用afterSingletonsInstantiated方法
代码语言:java复制@Override
public void afterSingletonsInstantiated() {
registerEndpoints();
}
也就是单例Bean实例化之后执行,会扫描容器中的WebSocket处理类并注册
代码语言:java复制protected void registerEndpoints() {
//收集WebSocket处理类
Set<Class<?>> endpointClasses = new LinkedHashSet<>();
if (this.annotatedEndpointClasses != null) {
endpointClasses.addAll(this.annotatedEndpointClasses);
}
ApplicationContext context = getApplicationContext();
if (context != null) {
//从容器中找到@ServerEndpoint注解标识的WebSocket处理类
String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
for (String beanName : endpointBeanNames) {
endpointClasses.add(context.getType(beanName));
}
}
//注册
for (Class<?> endpointClass : endpointClasses) {
registerEndpoint(endpointClass);
}
if (context != null) {
Map<String, ServerEndpointConfig> endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
registerEndpoint(endpointConfig);
}
}
}
最终加入WebSocketContainer容器(ServerContainer extends WebSocketContainer)
代码语言:java复制private void registerEndpoint(Class<?> endpointClass) {
ServerContainer serverContainer = getServerContainer();
Assert.state(serverContainer != null,
"No ServerContainer set. Most likely the server's own WebSocket ServletContainerInitializer "
"has not run yet. Was the Spring ApplicationContext refreshed through a "
"org.springframework.web.context.ContextLoaderListener, "
"i.e. after the ServletContext has been fully initialized?");
try {
if (logger.isDebugEnabled()) {
logger.debug("Registering @ServerEndpoint class: " endpointClass);
}
//加入容器
serverContainer.addEndpoint(endpointClass);
}
catch (DeploymentException ex) {
throw new IllegalStateException("Failed to register @ServerEndpoint class: " endpointClass, ex);
}
}
ServerEndpointExporter在容器启用时,扫描容器中被@ServerEndpoint标识的WebSocket处理类并注册
在前文曾说过:请求由EndPoint进行网络通信,当处理完网络通信封装成SocketProcessorBase交给线程池进行执行,会先调用Http11Processor解析再调用Adapter适配器交给容器处理
作为升级协议的WebSocket前面网络通信流程不变,而调用Processor时会使用UpgradeProcessorInternal
UpgradeProcessorInternal最终会找到WebSocketContainer容器中对应的WebSocket处理类对应的方法进行调用(不会打到Container容器)
总结
WebSocket是一种长期、双向、实时通信的协议,基于HTTP协议后升级为WebSocket协议
Tomcat在处理WebSocket时与HTTP请求有所不同,处理网络通信依旧还是使用EndPoint
当请求为HTTP时会使用Http11Processor接卸请求,经过适配器最终交给Container容器处理;当请求为WebSocket时使用UpgradeProcessorInternal,路由到WebSocketContainer容器中的ServerEndPoint处理类进行处理
ServerEndpointExporter实现SmartInitializingSingleton接口,在bean实例化后找到容器中被注解ServerEndPoint标识的处理类加入WebSocketContainer容器