「IM系列」WebSocket教程:消息持久化实现与应用

2023-12-13 11:57:56 浏览数 (2)

消息持久化

本次采用服务器端消息持久化,当服务器接收到WebSocket消息时,可以将消息保存到数据库或其他持久化存储中。这样,即使客户端断开连接,服务器仍然可以在需要时检索和处理这些消息。

注意:WebSocket消息持久化的具体实现方式取决于你的应用场景和需求。你可以根据自己的情况选择适合的方法来实现WebSocket消息的持久化。

需求分析

参考哔哩哔哩的消息列表,https://space.bilibili.com/350943383

在聊天列表页面中,会展示与曾经交流过的用户列表,在每一列中会出现的数据是用户名、用户头像、最后一条消息内容、最后一条消息的发送时间。

当点击会话列表之后,跳转到聊天列表页面中,在第一列会是当前私信的对象,若是第一次聊天,没有数据返回,同时创建一个新的回话列表。若不是第一次聊天,则会返回近期的聊天记录。

数据库设计

用户表 tinywan_user

代码语言:javascript复制
CREATE TABLE `user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增',
  `username` varchar(32) NOT NULL COMMENT '用户名',
  `nickname` varchar(32) NOT NULL COMMENT '用户昵称',
  `avatar` varchar(200) NOT NULL COMMENT '头像',
  `create_time` int(11) unsigned NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';

会话表tinywan_message_session

代码语言:javascript复制
CREATE TABLE `tinywan_message_session` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自动递增',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户ID(存储个人登录IM后的用户ID)',
  `to_user_id` bigint(20) unsigned NOT NULL COMMENT '对方用户id',
  `name` varchar(32) NOT NULL COMMENT '会话列表名称',
  `type` tinyint(4) unsigned NOT NULL DEFAULT '1' COMMENT '消息类型(1:群对话消息,2:个人对话消息)',
  `group_id` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '群组id',
  `avatar` varchar(255) NOT NULL DEFAULT '' COMMENT '头像',
  `create_time` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
  `update_time` int(11) NOT NULL DEFAULT '0' COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息会话表';

消息表 tinywan_message

代码语言:javascript复制
CREATE TABLE `tinywan_message` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自动递增',
  `from_user_id` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '来自用户id',
  `from_username` varchar(64) NOT NULL DEFAULT '0' COMMENT '来自用户昵称',
  `from_avatar` varchar(128) NOT NULL DEFAULT '0' COMMENT '来自用户头像',
  `to_user_id` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '接受用户id',
  `group_id` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '群组ID',
  `type` tinyint(4) unsigned NOT NULL DEFAULT '1' COMMENT '消息类型(1:文字,2:图片,3:文件,4:日程)',
  `mode` tinyint(4) unsigned NOT NULL DEFAULT '1' COMMENT '消息模式(1:单聊,2:群聊)',
  `content` text NOT NULL COMMENT '内容',
  `create_time` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
  `session_id` int(11) NOT NULL DEFAULT '0' COMMENT '会话id',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_group_id` (`group_id`) USING BTREE,
  KEY `idx_from_user_id` (`from_user_id`) USING BTREE,
  KEY `idx_to_user_id` (`to_user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息表';

代码实现

ORM类库这里使用Think-ORM是ThinkPHP官方的一个基于PHP和PDO的数据库中间层和ORM类库。

这里直接使用官方插件webman/think-orm是一个自动化安装 topthink/think-orm 的插件。它做了三个事情

  • 1、安装ThinkPHP官方的原生topthink/think-orm组件
  • 2、webman项目里自动增加配置文件config/thinkorm.php
  • 3、设置定时器定时向数据库发送select 1语句,避免数据库连接超时间空闲被数据库服务端断开。
composer安装
代码语言:javascript复制
composer require -W webman/think-orm

安装后将自动生成 config/thinkorm.php 数据库配置文件,开发者需要根据实际情况手动更改数据库配置。

模型类

MessageModel.php

代码语言:javascript复制
/**
 * @desc 消息模型
 * @author Tinywan(ShaoBo Wan)
 * @email 756684177@qq.com
 * @date 2023/12/10 10:43
 */
declare(strict_types=1);

namespace appcommonmodel;

class MessageModel extends BaseModel
{
    /*** 数据表名称* @var string */
    protected $table = 'tinywan_message';
}
消息事件

Events.php

代码语言:javascript复制
/**
 * @desc 当客户端发来数据后触发的回调函数
 * @param string $clientId
 * @param string $message
 * @return false
 * @author Tinywan(ShaoBo Wan)
 */
public static function onMessage(string $clientId, string $message): bool
{
    try {
        $originMessage = json_decode($message, true);
        if (json_last_error() != JSON_ERROR_NONE) {
            Gateway::closeClient($clientId, broadcast_json(400, '无效的json数据'));
            return false;
        }
        if (!is_array($originMessage)) {
            Gateway::closeClient($clientId, broadcast_json(400, '请求数据结构无法被解析'));
            return false;
        }

        $validate = new MessageFormatValidate();
        if (false === $validate->check($originMessage)) {
            Gateway::closeClient($clientId, broadcast_json(400, $validate->getError()));
            return false;
        }
        $groupId = $originMessage['group_id'] ?? 0;
        switch ($originMessage['event']) {
            case 'join':
                /** 群聊 */
                if ($originMessage['mode'] === 2) {
                    $_SESSION['group_id'] = $groupId;
                    Gateway::joinGroup($clientId, $groupId);
                    /** 私聊 */
                } else {
                    Gateway::bindUid($clientId, $originMessage['from_user_id']);
                }
                $_SESSION['mode'] = $originMessage['mode'];
                $_SESSION['event'] = $originMessage['event'];
                $_SESSION['group_id'] = $groupId;
                $_SESSION['from_user_id'] = $originMessage['from_user_id'];
                $_SESSION['from_username'] = $originMessage['from_username'];
                Gateway::sendToCurrentClient(broadcast_json(0, 'success', $originMessage));
                break;
            case 'speak':
                MessageModel::create([
                    'from_user_id' => $originMessage['from_user_id'],
                    'from_username' => $originMessage['from_username'],
                    'from_avatar' => $originMessage['from_avatar'] ?? '',
                    'to_user_id' => $originMessage['to_user_id'],
                    'group_id' => $groupId,
                    'type' => 1,
                    'mode' => $originMessage['mode'],
                    'content' => $originMessage['content']
                ]);
                /** 私聊 */
                if ($originMessage['mode'] == 1) {
                    $msg = $originMessage['from_username'] . '[单聊对]' . $originMessage['to_user_id'] . '[说]:' . $originMessage['content'];
                    Gateway::sendToUid($originMessage['to_user_id'], broadcast_json(0, $msg, $originMessage));
                    /** 群聊 */
                } else {
                    $msg = $originMessage['from_username'] . '[群聊说]:' . $originMessage['content'];
                    Gateway::sendToGroup($groupId, broadcast_json(0, $msg, $originMessage));
                }
                break;
            default:
                Gateway::sendToCurrentClient(broadcast_json(400, 'default invalid', $originMessage));
        }
    } catch (Throwable $throwable) {
        return Gateway::sendToClient($clientId, broadcast_json(500, $throwable->getMessage()));
    }
    return true;
}

持久化结果

发送消息测验

代码语言:javascript复制
let $_content = {
  "event": "speak",
  "mode": 2,
  "group_id": 100,
  "from_user_id": "10000",
  "from_username": "拉姆才让",
  "to_user_id": "10086",
  "content": "WebSocket教程:消息持久化的实现与应用",
};
ws.send(JSON.stringify($_content));
ws.onmessage = function(evt) {
  console.log( "【拉姆才让】接受消息: "   evt.data);
};

SQL 查询

代码语言:javascript复制
select from_user_id,from_username,to_user_id,group_id,mode,content, FROM_UNIXTIME(create_time) as date_time from tinywan_message;
小结

需要注意的是,将WebSocket消息持久化到MySQL数据库可能会对数据库的性能产生一定的影响,特别是在高并发的情况下。你可以考虑使用数据库连接池、索引优化等方法来提高性能。此外,还可以根据具体需求定期清理或归档旧的消息数据,以减少数据库的负担。

源码

上一教程章节:「IM系列」WebSocket教程:私聊和群聊实现

文章相关源码地址:https://github.com/Tinywan/webman-admin

0 人点赞