SpringBoot2整合WebSocket,实现后台向前端推送信息

2022-06-12 10:59:24 浏览数 (1)

背景是客户提出需要在 IOC 智能运营中心使用 Pad 控制页面进行跳转,类似于电视的遥控器一样。这样IOC的讲解员可以在 Pad 上面操作控制页面进行展示。我们的解决方案是通过使用 WebSocket 实现,前台监听,后台开放 API 给 Pad 上的页面,后台收到消息后推送给前台,前台再做出对应的反应。这样基本可以满足要求了~

什么是 WebSocket?

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,在 WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

为什么需要 WebSocket?

初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处?

答案很简单,因为 HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。

举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此 WebSocket 就是这样发明的。如我们出去吃饭,在公众号上排队,不需要我们自己查询,当有变动时公众号实时将消息推送给我们,我们就可以知道排队的状态了,不必每次询问服务员。

WebSocket ws 和 wss 的区别

WS 协议和 WSS 协议两个均是 WebSocket 协议的 SCHEM,两者一个是非安全的,一个是安全的。也是统一的资源标志符。就好比 HTTP 协议和 HTTPS 协议的差别。非安全的没有证书,安全的需要 SSL 证书。

其中 WSS 表示在 TLS 之上的 WebSocket。WS 一般默认是 80 端口,而 WSS 默认是 443 端口,大多数网站用的就是 80 和 433 端口。

http 和 ws 的对应关系:

代码语言:javascript复制
http -> new WebSocket('ws://xxx')

https -> new WebSocket('wss://xxx')

SpringBoot2整合WebSocket

添加 maven 依赖

pom.xml 添加如下依赖

代码语言:javascript复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

新建 WebSocket 的配置类

配置类会检测带有注解 @ServerEndpoint 的 bean 并注册它们。

记得加上 @Configuration 注解

代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:29
 */
@Configuration
@Slf4j
public class WebSocketConfig {

    /**
     * 给spring容器注入这个ServerEndpointExporter对象
     * 相当于xml:
     * <beans>
     * <bean/>
     * </beans>
     * <p>
     * 检测所有带有@serverEndpoint注解的bean并注册他们。
     *
     * @return ServerEndpointExporter对象
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        log.info("WebSocketConfig 注入!");
        return new ServerEndpointExporter();
    }
}

新建 WebSocketServer 的处理类

代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lixiaojin
 * @date 2021/11/3 16:06
 */
@ServerEndpoint("/webSocket/{username}")
@Component
@Slf4j
public class WebSocketServer {

    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();
    private Session session;
    private String username;

    @OnOpen
    public void onOpen(@PathParam("username") String username, Session session) throws IOException {
        this.username = username;
        this.session = session;

        clients.put(username, this);
        // 在线数加1
        int cnt = ONLINE_COUNT.incrementAndGet();
        log.info("{}加入连接,当前连接数为:{}", username, cnt);

    }

    @OnClose
    public void onClose() throws IOException {
        clients.remove(username);
        int cnt = ONLINE_COUNT.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("来自客户端的消息:{}", message);
        sendInfo(session, message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    public void sendMessage(String message, String name) throws IOException {
        Session session = null;
        for (WebSocketServer item : clients.values()) {
            if (item.username.equals(name)) {
                session = item.session;
            }
        }
        if (session == null) {
            throw new BizException(CosmosResultCodeEnum.BIZ_FAIL, "找不到对应的seesion!");
        }
        sendInfo(session, message);
    }

    public void sendMessageAll(String message) throws IOException {
        for (WebSocketServer item : clients.values()) {
            sendInfo(item.session, message);
        }
    }

    private void sendInfo(Session session, String message) throws IOException {
        session.getAsyncRemote().sendText(message);
    }
}

新建 WebSocketController 控制器

提供 API 接口向前台发送消息。

代码语言:javascript复制
package cn.sibat.ioc.remote.control.controller;

import cn.lixj.test.ioc.remote.control.manager.WebSocketServer;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:39
 */
@Api(tags = "测试接口")
@RestController
@RequestMapping("/api/v1/")
public class TestController {

    @Autowired
    private WebSocketServer webSocketServer;

    @PostMapping("/test/send")
    public String sendInfo(@RequestParam String name, @RequestParam String message) {
        webSocketServer.sendMessage(message, name);
        return "发送成功!";
    }

}

前台测试页面

代码语言:javascript复制
<html>
<head>
    <meta charset="UTF-8">
    <title>websocket测试</title>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
    <style type="text/css">
        h3,h4{
            text-align:center;
        }
    </style>
</head>
<body>
 
<h3>WebSocket测试,客户端接收到的消息如下:</h3>
 
<textarea id = "messageId" readonly="readonly" cols="150" rows="30" >
 
</textarea>
 
 
<script type="text/javascript">
    var socket;
    if (typeof (WebSocket) == "undefined") {
        console.log("遗憾:您的浏览器不支持WebSocket");
    } else {
        console.log("恭喜:您的浏览器支持WebSocket");
        //实现化WebSocket对象
        //指定要连接的服务器地址与端口建立连接
        //注意ws、wss使用不同的端口。我使用自签名的证书测试,
        //无法使用wss,浏览器打开WebSocket时报错
        //ws对应http、wss对应https。
        socket = new WebSocket("ws://localhost:8080/webSocket/lixj");
        //连接打开事件
        socket.onopen = function() {
            //console.log("Socket 已打开");
            socket.send("消息发送测试(From Client)");
            
        };
        //收到消息事件
        socket.onmessage = function(msg) {
            $("#messageId").append(msg.data  "n");
            console.log(msg.data  );
        };
        //连接关闭事件
        socket.onclose = function() {
            console.log("Socket已关闭");
        };
        //发生了错误事件
        socket.onerror = function() {
            alert("Socket发生了错误");
        }
        //窗口关闭时,关闭连接
        window.unload=function() {
            socket.close();
        };
    }
    function hello(){
        setTimeout(socket.onopen, 1000);
    }
    //重复执行保持socket存活
    var t1 = window.setInterval(hello, 30000);
    
</script>
 
</body>
</html>

联调测试

  1. 启动 springboot 项目,启动 WebSocket 服务端;

  1. 打开前台页面,测试连接; 前台调用链接为:ws://localhost:8080/webSocket/lixj

  1. 调用后台接口发送消息;
代码语言:javascript复制
curl -X POST -H  "Accept:*/*" -H  "Request-Origion:Knife4j" -H  "Content-Type:application/x-www-form-urlencoded" --data-urlencode  "message=你看到我的小熊了吗&name=lixj" "http://localhost:8080/api/v1/test/send"

  1. 前台接收到消息;

需要注意的地方

关于超时自动断开连接的问题

使用 Nginx 代理 WebSocket 时,客户端与服务器握手成功后,如果在 60 秒内没有数据交互,就会自动断开连接。因为 Nginx 默认的断开链接时间为 60 秒,为保持长连接,可有两种解决方法。

  1. 修改 Nginx 的超时时间;
  2. 前端在超时时间内做心跳保活机制(如上的 html 加了定时保活任务)

WebSocket 的 Nginx 配置问题

如果需要 Nginx 来进行代理,api 接口和 webSocket 需要分开配置,示例如下。

代码语言:javascript复制
         location ^~ /api/ {
              proxy_pass http://localhost:8080;
         }
 
         location ^~ /webSocket/ {
             proxy_pass http://localhost:8080;
             proxy_http_version 1.1;
             proxy_set_header Upgrade $http_upgrade;
             proxy_set_header Connection upgrade;
         }

如果不需要指定 name 的话可以用下面的 WebSocketServer 类

根据 sessionId 进行匹配

代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:30
 */
@ServerEndpoint(value = "/ws/asset")
@Component
@Slf4j
public class WebSocketServer {

    private Session session;

    @PostConstruct
    public void init() {
        System.out.println("websocket 加载");
    }

    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        SessionSet.add(session);
        // 在线数加1
        int cnt = OnlineCount.incrementAndGet();
        log.info("有连接加入,当前连接数为:{}", cnt);
        SendMessage(session, "连接成功");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        SessionSet.remove(session);
        int cnt = OnlineCount.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自客户端的消息:{}", message);
        SendMessage(session, "收到消息,消息内容:"   message);

    }

    /**
     * 出现错误
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:{},Session ID:{}", error.getMessage(), session.getId());
        error.printStackTrace();
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param session
     * @param message
     */
    public static void SendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
//            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("发送消息出错:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 群发消息
     *
     * @param message
     * @throws IOException
     */
    public void BroadCastInfo(String message) throws IOException {
        for (Session session : SessionSet) {
            if (session.isOpen()) {
                SendMessage(session, message);
            }
        }
    }

    /**
     * 指定Session发送消息
     *
     * @param sessionId
     * @param message
     * @throws IOException
     */
    public void SendMessage(String message, String sessionId) throws IOException {
        Session session = null;
        for (Session s : SessionSet) {
            if (s.getId().equals(sessionId)) {
                session = s;
                break;
            }
        }
        if (session != null) {
            SendMessage(session, message);
        } else {
            log.warn("没有找到你指定ID的会话:{}", sessionId);
        }
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

}

End.

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可 Links: https://cloud.tencent.com/developer/article/2020779

0 人点赞