SpringBoot 中的服务端消息推送

2019-12-05 16:54:29 浏览数 (1)

代码语言:javascript复制
作者:Montos
链接:https://juejin.im/post/5ddb88b5f265da7dd07947d1

在跟基友看完《天气之子》之后,突然发觉上天与人类的通信是选择其中某些人作为连接载体,再进行单向性的通信(人类向上天)。这个时候就想到了JAVA中的服务器与客户端之间的通信方式。

WebSocket

相信有些小伙伴对此的不陌生吧,大多数接触服务器端推送消息的则选择的都是websocket。毕竟我们的业务场景中大多数都是相互通信的。

主要的代码如下:

代码语言:javascript复制
package com.montos.endpoint;

import com.montos.config.SessionMap;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;

/**
 * @Author: Montos
 * @Date:2019/11/26 3:04 下午
 * @Version 1.0
 * 服务节点
 */
@Component
@ServerEndpoint(value = "/websocket/{userId}")
public class ServiceWebSocketServer {

    // 这是要返回到界面上的会话
    private Session session;
    // 连接的用户id
    private String userId;

    /**
     * 会话开启
     *
     * @param userId
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("userId") String userId, Session session) {
        this.session = session;
        this.userId = userId;
        SessionMap.putSession(userId, session);
        try {
            this.send(session, String.format("欢迎你的加入%s", userId));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 会话关闭
     */
    @OnClose
    public void onClose() {
        SessionMap.removeSession(this.userId);
    }

    /**
     * 会话出错
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    /**
     * 发送消息
     *
     * @param message
     * @throws IOException
     */
    @OnMessage
    public void sendMessage(String message) throws IOException {
        Map<String, Session> allSessionMap = SessionMap.getAllSessionMap();
        for (Map.Entry<String, Session> map : allSessionMap.entrySet()) {
            if (map.getKey().equalsIgnoreCase(userId)) {
                continue;
            } else {
                this.send(map.getValue(), message);
            }
        }
    }

    /**
     * 发送消息
     *
     * @param message
     * @throws IOException
     */
    public void send(Session session, String message) throws IOException {
        if (session.isOpen()) {
            session.getAsyncRemote().sendText(message);
        } else {// 有可能关闭了 此时等待5s     再次进行 发送,前端会有对应的轮询任务
            try {
                Thread.sleep(5000);
                session.getBasicRemote().sendText(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

SpringBoot中需要注意的是,如果你是以jar包运行,则需要手动的在容器中注入ServerEndpointExporter这个Bean(不进行加载,则连接不上会话)。

上面只是贴出来主要的连接会话,关闭会话等等方法操作,里面还有的是一个集合类,存储的是对应用户的会话(websocketSession是不能被序列化的,也就意思是不能序列化在其他的存储设备上)。

SseEmitter

SseEmitter也是一个服务端推送消息给客户端的技术,这个也是需要进行连接才可以操作的,但是不可以进行直接通信服务端的。与上面的还是有一定的区别。

主要的代码如下:

代码语言:javascript复制
package com.montos.sseemitter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: Montos
 * @Date:2019/11/26 9:12 上午
 * @Version 1.0
 * * 1.请求http://localhost:8888/sse/start?userId=111接口,浏览器会阻塞,等待服务器返回结果;
 * * 2.请求http://localhost:8888/sse/send?userId=111接口,可以请求多次,并观察第1步的浏览器返回结果;
 * * 3.请求http://localhost:8888/sse/end?userId=111接口结束某个请求,第1步的浏览器将结束阻塞;
 */

@RestController
@RequestMapping("/sse")
public class SseEmitterController {
    private static final Logger logger = LoggerFactory.getLogger(SseEmitterController.class);

    // 用于保存每个请求对应的 SseEmitter
    private Map<String, Result> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 返回SseEmitter对象
     *
     * @param userId
     * @return
     */
    @RequestMapping("/start")
    public SseEmitter testSseEmitter(String userId) {
        // 默认30秒超时,设置为0L则永不超时
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitterMap.put(userId, new Result(userId, System.currentTimeMillis(), sseEmitter));
        logger.info(String.format("testSseEmitter is start"));
        return sseEmitter;
    }

    /**
     * 向SseEmitter对象发送数据
     *
     * @param userId
     * @return
     */
    @RequestMapping("/send")
    public String setSseEmitter(String userId) {
        try {
            Result result = sseEmitterMap.get(userId);
            if (result != null && result.sseEmitter != null) {
                String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                result.sseEmitter.send(format);
            }
        } catch (IOException e) {
            logger.error("IOException!", e);
            return "error";
        }
        return "Succeed!";
    }

    /**
     * 将SseEmitter对象设置成完成
     *
     * @param userId
     * @return
     */
    @RequestMapping("/end")
    public String completeSseEmitter(String userId) {
        Result result = sseEmitterMap.get(userId);
        if (result != null) {
            sseEmitterMap.remove(userId);
            result.sseEmitter.complete();
        }
        return "Succeed!";
    }

    private class Result {
        public String userId;
        public long timestamp;
        public SseEmitter sseEmitter;

        public Result(String userId, long timestamp, SseEmitter sseEmitter) {
            this.userId = userId;
            this.timestamp = timestamp;
            this.sseEmitter = sseEmitter;
        }
    }
}

启动相关项目,然后按照上面注释操作步骤,我们就可以在第一个开始连接的浏览器页面中查看到我们推送的信息。如果服务器没有进行推送,则客户端处于等待的一种状态。

二者区别

以上两种方法都能实现服务端向客户端推送消息的情况,这两个情况还是有一定的区别。

  1. WebSocket是全双工通道,可以双向通信,功能更强;SSE是单向通道,只能服务器向浏览器端发送。
  2. WebSocket是一个新的协议,需要服务器端支持;SSE则是部署在 HTTP协议之上的,现有的服务器软件都支持。
  3. SSE是一个轻量级协议,相对简单;WebSocket是一种较重的协议,相对复杂。
  4. SSE默认支持断线重连,WebSocket则需要额外部署。
  5. SSE支持自定义发送的数据类型。
  6. SSE不支持CORS 参数url就是服务器网址,必须与当前网页的网址在同一个网域(domain),而且协议和端口都必须相同。

0 人点赞