背景是客户提出需要在 IOC 智能运营中心使用 Pad 控制页面进行跳转,类似于电视的遥控器一样。这样IOC的讲解员可以在 Pad 上面操作控制页面进行展示。我们的解决方案是通过使用 WebSocket 实现,前台监听,后台开放 API 给 Pad 上的页面,后台收到消息后推送给前台,前台再做出对应的反应。这样基本可以满足要求了~
什么是 WebSocket?
WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,在 WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
为什么需要 WebSocket?
初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处?
答案很简单,因为 HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此 WebSocket 就是这样发明的。如我们出去吃饭,在公众号上排队,不需要我们自己查询,当有变动时公众号实时将消息推送给我们,我们就可以知道排队的状态了,不必每次询问服务员。
WebSocket ws 和 wss 的区别
WS 协议和 WSS 协议两个均是 WebSocket 协议的 SCHEM,两者一个是非安全的,一个是安全的。也是统一的资源标志符。就好比 HTTP 协议和 HTTPS 协议的差别。非安全的没有证书,安全的需要 SSL 证书。
其中 WSS 表示在 TLS 之上的 WebSocket。WS 一般默认是 80 端口,而 WSS 默认是 443 端口,大多数网站用的就是 80 和 433 端口。
http 和 ws 的对应关系:
代码语言:javascript复制http -> new WebSocket('ws://xxx')
https -> new WebSocket('wss://xxx')
SpringBoot2整合WebSocket
添加 maven 依赖
在 pom.xml
添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
新建 WebSocket 的配置类
配置类会检测带有注解 @ServerEndpoint
的 bean 并注册它们。
代码语言:javascript复制记得加上
@Configuration
注解
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author lixiaojin
* @date 2021/11/3 14:29
*/
@Configuration
@Slf4j
public class WebSocketConfig {
/**
* 给spring容器注入这个ServerEndpointExporter对象
* 相当于xml:
* <beans>
* <bean/>
* </beans>
* <p>
* 检测所有带有@serverEndpoint注解的bean并注册他们。
*
* @return ServerEndpointExporter对象
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
log.info("WebSocketConfig 注入!");
return new ServerEndpointExporter();
}
}
新建 WebSocketServer 的处理类
代码语言:javascript复制import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author lixiaojin
* @date 2021/11/3 16:06
*/
@ServerEndpoint("/webSocket/{username}")
@Component
@Slf4j
public class WebSocketServer {
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();
private Session session;
private String username;
@OnOpen
public void onOpen(@PathParam("username") String username, Session session) throws IOException {
this.username = username;
this.session = session;
clients.put(username, this);
// 在线数加1
int cnt = ONLINE_COUNT.incrementAndGet();
log.info("{}加入连接,当前连接数为:{}", username, cnt);
}
@OnClose
public void onClose() throws IOException {
clients.remove(username);
int cnt = ONLINE_COUNT.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
@OnMessage
public void onMessage(String message) throws IOException {
log.info("来自客户端的消息:{}", message);
sendInfo(session, message);
}
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
public void sendMessage(String message, String name) throws IOException {
Session session = null;
for (WebSocketServer item : clients.values()) {
if (item.username.equals(name)) {
session = item.session;
}
}
if (session == null) {
throw new BizException(CosmosResultCodeEnum.BIZ_FAIL, "找不到对应的seesion!");
}
sendInfo(session, message);
}
public void sendMessageAll(String message) throws IOException {
for (WebSocketServer item : clients.values()) {
sendInfo(item.session, message);
}
}
private void sendInfo(Session session, String message) throws IOException {
session.getAsyncRemote().sendText(message);
}
}
新建 WebSocketController 控制器
提供 API 接口向前台发送消息。
代码语言:javascript复制package cn.sibat.ioc.remote.control.controller;
import cn.lixj.test.ioc.remote.control.manager.WebSocketServer;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
/**
* @author lixiaojin
* @date 2021/11/3 14:39
*/
@Api(tags = "测试接口")
@RestController
@RequestMapping("/api/v1/")
public class TestController {
@Autowired
private WebSocketServer webSocketServer;
@PostMapping("/test/send")
public String sendInfo(@RequestParam String name, @RequestParam String message) {
webSocketServer.sendMessage(message, name);
return "发送成功!";
}
}
前台测试页面
代码语言:javascript复制<html>
<head>
<meta charset="UTF-8">
<title>websocket测试</title>
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
<style type="text/css">
h3,h4{
text-align:center;
}
</style>
</head>
<body>
<h3>WebSocket测试,客户端接收到的消息如下:</h3>
<textarea id = "messageId" readonly="readonly" cols="150" rows="30" >
</textarea>
<script type="text/javascript">
var socket;
if (typeof (WebSocket) == "undefined") {
console.log("遗憾:您的浏览器不支持WebSocket");
} else {
console.log("恭喜:您的浏览器支持WebSocket");
//实现化WebSocket对象
//指定要连接的服务器地址与端口建立连接
//注意ws、wss使用不同的端口。我使用自签名的证书测试,
//无法使用wss,浏览器打开WebSocket时报错
//ws对应http、wss对应https。
socket = new WebSocket("ws://localhost:8080/webSocket/lixj");
//连接打开事件
socket.onopen = function() {
//console.log("Socket 已打开");
socket.send("消息发送测试(From Client)");
};
//收到消息事件
socket.onmessage = function(msg) {
$("#messageId").append(msg.data "n");
console.log(msg.data );
};
//连接关闭事件
socket.onclose = function() {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function() {
alert("Socket发生了错误");
}
//窗口关闭时,关闭连接
window.unload=function() {
socket.close();
};
}
function hello(){
setTimeout(socket.onopen, 1000);
}
//重复执行保持socket存活
var t1 = window.setInterval(hello, 30000);
</script>
</body>
</html>
联调测试
- 启动 springboot 项目,启动 WebSocket 服务端;
- 打开前台页面,测试连接; 前台调用链接为:ws://localhost:8080/webSocket/lixj
- 调用后台接口发送消息;
curl -X POST -H "Accept:*/*" -H "Request-Origion:Knife4j" -H "Content-Type:application/x-www-form-urlencoded" --data-urlencode "message=你看到我的小熊了吗&name=lixj" "http://localhost:8080/api/v1/test/send"
- 前台接收到消息;
需要注意的地方
关于超时自动断开连接的问题
使用 Nginx 代理 WebSocket 时,客户端与服务器握手成功后,如果在 60 秒内没有数据交互,就会自动断开连接。因为 Nginx 默认的断开链接时间为 60 秒,为保持长连接,可有两种解决方法。
- 修改 Nginx 的超时时间;
- 前端在超时时间内做心跳保活机制(如上的 html 加了定时保活任务)
WebSocket 的 Nginx 配置问题
如果需要 Nginx 来进行代理,api 接口和 webSocket 需要分开配置,示例如下。
代码语言:javascript复制 location ^~ /api/ {
proxy_pass http://localhost:8080;
}
location ^~ /webSocket/ {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection upgrade;
}
如果不需要指定 name 的话可以用下面的 WebSocketServer 类
代码语言:javascript复制根据 sessionId 进行匹配
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author lixiaojin
* @date 2021/11/3 14:30
*/
@ServerEndpoint(value = "/ws/asset")
@Component
@Slf4j
public class WebSocketServer {
private Session session;
@PostConstruct
public void init() {
System.out.println("websocket 加载");
}
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
SessionSet.add(session);
// 在线数加1
int cnt = OnlineCount.incrementAndGet();
log.info("有连接加入,当前连接数为:{}", cnt);
SendMessage(session, "连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
SessionSet.remove(session);
int cnt = OnlineCount.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息:{}", message);
SendMessage(session, "收到消息,消息内容:" message);
}
/**
* 出现错误
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:{},Session ID:{}", error.getMessage(), session.getId());
error.printStackTrace();
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
*
* @param session
* @param message
*/
public static void SendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
// session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送消息出错:{}", e.getMessage());
e.printStackTrace();
}
}
/**
* 群发消息
*
* @param message
* @throws IOException
*/
public void BroadCastInfo(String message) throws IOException {
for (Session session : SessionSet) {
if (session.isOpen()) {
SendMessage(session, message);
}
}
}
/**
* 指定Session发送消息
*
* @param sessionId
* @param message
* @throws IOException
*/
public void SendMessage(String message, String sessionId) throws IOException {
Session session = null;
for (Session s : SessionSet) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
SendMessage(session, message);
} else {
log.warn("没有找到你指定ID的会话:{}", sessionId);
}
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
}
End.
Copyright: 采用 知识共享署名4.0 国际许可协议进行许可 Links: https://cloud.tencent.com/developer/article/2020779