实时会话系统实现(2) --- express-ws改写会话系统

2019-11-25 22:40:40 浏览数 (1)

上一篇提到过实际上会话系统最简单的方式是http轮询:用户发送信息时实现一个http接口保存用户聊天信息,然后在客户端实现一个定时器,定时获取用户A与用户B的聊天信息,并且重新渲染聊天界面。我们在上一篇成功通过http轮询的方式实现绘画系统,但是我们也有提到过http轮询的缺点在于轮询中的大部分请求都是没有实际意义的,所以会极大的浪费和消耗带宽和服务器资源。所以本节课我们通过express框架支持的一个websocket库--express-ws来改写上一篇实现的会话系统。

客户端代码其实和上篇文章基本一致,只是增加了个上传视频的按钮,因为小程序没有选择文件的api,所以我们只能通过wx.chooseImage来选择图片发送,通过wx.chooseVideo来选择视频发送,实际上效果就是在上一篇的基础上加了一个视频按钮,因为上一篇没有提到图片发送和视频发送的客户端逻辑有读者私聊问到具体的逻辑,这边简单讲讲小程序客户端如何实现图片选择发送以及视频选择发送。实际上我们可以通过wx.chooseImage选择图片,这个api实际上很简单,指定最多可选择图片张数以及图片来源等,选择成功实际上会返回一个图片的临时路径tempFilePaths,然后使用form-data的方式将tempFilePaths提交到后端接口进行图片上传,图片上传成功后会返回图片的url地址,这时候再进行信息发送保存聊天记录到数据库。我们可以看一眼逻辑代码:

代码语言:javascript复制
//发送图片
  chooseImage() {
    var that = this;
    wx.chooseImage({
      count: 1, // 默认9张图片
      sizeType: ['original', 'compressed'], // 可以指定是原图还是压缩图,默认二者都有
      sourceType: ['album', 'camera'], // 可以指定来源是相册还是相机,默认二者都有
      success: function(res) {
        // 返回选定照片的本地文件路径列表,tempFilePath可以作为img标签的src属性显示图片
        var tempFilePaths = res.tempFilePaths;
        that.setData({
          loading: true,
          increase: false
        });

        wx.uploadFile({
          url: utils.basePath   '/users/upload_avatar',
          filePath: tempFilePaths[0],
          name: 'avatar',
          headers: {
            'Content-Type': 'form-data'
          },

          success: function(res) {
            that.setData({
              loading: false
            });

            var result = JSON.parse(res.data);
            if (result.status == 200) {
              //图片上传成功,将聊天记录保存数据库
              var chatInfo = that.data.chatInfo;
              chatInfo.chat_content = result.payload.avatar_path;
              chatInfo.chat_type = 1;
              chatInfo = JSON.stringify(chatInfo);
              websocket.send(chatInfo);

              //接受服务器消息
              wx.onSocketMessage(function(res) {
                var data = JSON.parse(res.data).data;
                data[0].flagtime = true;

                for (var i = 1; i < data.length; i  ) {
                  var currenttime = new Date(data[i].created_date).getTime();
                  var begintime = new Date(data[i - 1].created_date).getTime();

                  if (currenttime - begintime > 1000 * 60) {
                    data[i].flagtime = true;
                  } else {
                    data[i].flagtime = false;
                  }
                }
                that.setData({
                  newslist: data
                });

                //将聊天界面定位到最新的聊天记录
                that.bottom();
              });
            } else {
              $Toast({
                content: result.err,
                type: 'error'
              });
            }
          }
        });
      }
    });
  }

视频发送实际上和图片发送几乎一致,就是将wx.chooseImage换成wx.chooseVideo就可以,但是视频上传这里面有几个坑需要逃避一下:

  • wx.chooseVideo有个属性compressed参数可以设置视频是否需要压缩,默认是true,视频会经过压缩上传,经过实测发现视频经过压缩清晰度极低,所以可以携带compressed参数关闭视频压缩。
  • 视频大小实际上和微信是保持一致的,无法发送超过24M的视频,但是我测试的时候发现超过1M的服务器一直报413状态码提示视频过大,实际上就是我们后端没有设置body最大的长度,比如我是Nginx对上传的域名pic.niyueling.cn增加了client_max_body_size实行,设置为25M,就可以躲过413状态这个坑。

接下来我们一样看下代码:

代码语言:javascript复制
//发送视频
  chooseVideo() {
    var that = this;
    wx.chooseVideo({
      sourceType: ['album', 'camera'],
      maxDuration: 60,
      compressed: false,
      camera: 'back',
      success: function(res) {
        var tempFilePaths = res.tempFilePath;

        that.setData({
          loading: true,
          increase: false
        });

        wx.uploadFile({
          url: utils.basePath   '/users/upload_video',
          filePath: tempFilePaths,
          name: 'mp4_url',
          headers: {
            'Content-Type': 'form-data'
          },

          success: function(res) {
            if (res.statusCode == 413) {
              that.setData({
                loading: false
              });
              $Toast({
                content: '视频过大,请重新上传',
                type: 'error'
              });
            } else {
              that.setData({
                loading: false
              });

              var result = JSON.parse(res.data);
              if (result.status == 200) {
                //上传视频操作
                var chatInfo = that.data.chatInfo;
                chatInfo.chat_content = result.payload;
                chatInfo.chat_type = 2;
                chatInfo = JSON.stringify(chatInfo);

                websocket.send(chatInfo);

                //接受服务器消息
                wx.onSocketMessage(function(res) {
                  var data = JSON.parse(res.data).data;
                  data[0].flagtime = true;

                  for (var i = 1; i < data.length; i  ) {
                    var currenttime = new Date(data[i].created_date).getTime();
                    var begintime = new Date(data[i - 1].created_date).getTime();

                    if (currenttime - begintime > 1000 * 60) {
                      data[i].flagtime = true;
                    } else {
                      data[i].flagtime = false;
                    }
                  }
                  that.setData({
                    newslist: data
                  });

                  that.bottom();
                });
              } else {
                $Toast({
                  content: result.err,
                  type: 'error'
                });
              }
            }
          }
        });
      }
    });
  }

接下来就开始正式讲讲websocket在小程序的使用了,其实websocket在小程序封装的很完美,可以让没接触过websocket开发的快速上手。我们在utils下创建一个websocket.js,在里面封装websocket的基本操作。实际上在会话系统我们目前仅仅需要websocket连接,发送消息,接受消息三个方法,所以我们在websocket.js中定义这三个方法,然后使用module.exports导出,使得在任何界面都可以调用这几个方法,我们看下代码:

代码语言:javascript复制

const util = require('./util.js');

//发起websocket连接
function connect(user, func) {
  wx.connectSocket({
    url: util.wssPath   '/chat/v1/message?friendphone='   user.friendInfo.account   '&userphone='   user.userInfo.account   '&app_sid='   user.userInfo.app_sid,
    header: { 
      'content-type': 'application/json' 
    },
    success: function (res) {
      console.log(res)
    },

    fail: function (res) {
      console.log(res);
    }
  });

  wx.onSocketOpen(function (res) {
    //接受服务器消息
    wx.onSocketMessage(func);//func回调可以拿到服务器返回的数据
  });

  wx.onSocketError(function (res) {
    wx.showToast({
      title: res.errMsg,
      icon: "none",
      duration: 1000
    });
  });
}

//发送消息
function send(msg) {
  wx.sendSocketMessage({
    data: msg
  });
}

module.exports = {
  connect: connect,
  send: send
}

然后在会话界面的onLoad方法连接websocket,连接成功接口会返回历史聊天记录,可以渲染出聊天界面。我们可以看下onLoad的关键代码:

代码语言:javascript复制
websocket.connect(this.data.chatInfo, function(res) {
        if (JSON.parse(res.data).data.length == 0) {
          that.setData({
            newslist: []
          });
        } else {
          var data = JSON.parse(res.data).data;
          data[0].flagtime = true;

          for (var i = 1; i < data.length; i  ) {
            var currenttime = new Date(data[i].created_date).getTime();
            var begintime = new Date(data[i - 1].created_date).getTime();

            if (currenttime - begintime > 1000 * 60) {
              data[i].flagtime = true;
            } else {
              data[i].flagtime = false;
            }
          }
          that.setData({
            newslist: data
          });

          that.bottom();
        }
      });

然后用户发送消息之后使用刚才封装好的方法send发送消息,消息发送成功服务端会返回新的聊天记录,动态渲染聊天界面。可以看下关键代码:

代码语言:javascript复制
//封装聊天记录参数
      var chatInfo = that.data.chatInfo;
      chatInfo.chat_content = that.data.message;
      chatInfo.chat_type = 0;
      chatInfo = JSON.stringify(chatInfo);

      websocket.send(chatInfo);

      //接受服务器消息
      wx.onSocketMessage(function(res) {
        var data = JSON.parse(res.data).data;
        data[0].flagtime = true;

        for (var i = 1; i < data.length; i  ) {
          var currenttime = new Date(data[i].created_date).getTime();
          var begintime = new Date(data[i - 1].created_date).getTime();

          if (currenttime - begintime > 1000 * 60) {
            data[i].flagtime = true;
          } else {
            data[i].flagtime = false;
          }
        }
        that.setData({
          newslist: data
        });

        that.bottom();
      });

到这里我们小程序端的websocket连接全部实现了。下一步需要在服务端实现wss接口。首先和https一样,小程序只支持wss,所以我们需要申请证书先在Nginx配置wss:

代码语言:javascript复制
upstream backend_chatws {
    server 127.0.0.1:3001 weight=10;
}

server {
  listen 443 ssl;
  server_name ws.niyueling.cn;
  ssl_certificate /etc/nginx/ctr/ws_niyueling_cn.crt;
  ssl_certificate_key /etc/nginx/ctr/ws_niyueling_cn.key;
  ssl_session_cache    shared:SSL:1m;
  ssl_session_timeout  5m;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2; 
    ssl_prefer_server_ciphers on;
  server_tokens off;
  access_log /var/log/nginx/api.log  main;

    location / {
        client_max_body_size 100m;
        proxy_redirect off;
        proxy_pass http://backend_chatws;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_read_timeout 604800s;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
    }


    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }
}

刚才已经说过了本篇文章使用express-ws库来封装websocket,接下来我们看看express-ws库的基本使用。因为我们正式开发一般后端不可能把所有接口写在同一个文件中,所以我们这边也通过分文件来实现。首先npm安装express-ws依赖,然后在app.js引入express-ws:

代码语言:javascript复制
var express = require('express');
var app = express();
var expressWs = require('express-ws')(app);
var chat = require('./routes/chat');
 
app.use('/chat/v1', chat);

app.listen(3001);

可以看到我们在app.js引用了chat.js文件表示我们实际上websocket接口是在chat.js中实现,接下来我们在chat.js中引用express-ws,这里需要注意如果分文件实现接口必须在app.js和具体的接口js文件都引入express-ws才可以正常使用。然后接口的实现实际上和http接口实现方法类似,我们引入express-ws后实际上router就多了一个ws方法,就是用来书写websocket接口,然后接口中实际上是存在两部分逻辑,第一次调用就等于websocket连接事件,这时候我们要查询好友的聊天记录返回,当用户发送消息时,会触发message事件,这时候先保存用户聊天记录再查询最新的聊天记录并返回。我们可以看下代码:

代码语言:javascript复制
router.ws('/message', function (ws, req) {
  var par = paramAll(req);

  if (!par.friendphone || !par.userphone || !par.app_sid) {
    return ws.send(JSON.stringify({ code: 0, msg: '参数不全!' }));
  }

  //查询用户历史记录
  chatDao.getOnlineChat(par, function (err, data) {
    if (err) {
      return ws.send(JSON.stringify({
        code: 0,
        msg: err
      }));
    }

    return ws.send(JSON.stringify({
      code: 1,
      data: data
    }));
  });

  ws.on('message', function (msg) {
    par.msg = JSON.parse(msg);
    //将记录添加到数据库,并返回最新记录列表
    chatDao.saveOnlineChat(par.msg, function (err, data) {
      if (err) {
        return ws.send(JSON.stringify({
          code: 0,
          msg: err
        }));
      }

      return ws.send(JSON.stringify({
        code: 1,
        data: data
      }));
    });
  });
});

数据库操作逻辑实际上也分别对应两部分,websocket连接时会返回两个好友间的历史聊天记录:

代码语言:javascript复制
async.waterfall([
            function (callback) {
                connection.beginTransaction(function (err) {
                    return callback(err);
                });
            },
            //通过friendphone查询好友信息
            function (callback) {
                var sql = 'select username, avatar from users where account = ? and app_sid = ?';
                var value = [data.friendphone, data.app_sid];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (!result[0]) {
                        return callback('用户不存在!');
                    }
                    data.friendname = result[0].username;
                    data.friendavatar = result[0].avatar;

                    return callback(null, 200);
                });
            },
            //通过userphone查询好友信息
            function (info, callback) {
                var sql = 'select username, avatar from users where account = ? and app_sid = ?';
                var value = [data.userphone, data.app_sid];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (!result[0]) {
                        return callback('用户不存在!');
                    }
                    data.username = result[0].username;
                    data.useravatar = result[0].avatar;

                    return callback(null, 200);
                });
            },
            function (release_info, callback) {
                var sql = 'select id, friendphone, friendname, friendavatar, app_sid, DATE_FORMAT(created_date, "%Y-%m-%d %H:%i:%s") as created_date, userphone, username, useravatar, content, chat_type from online_chat '  
                    'where (friendphone = ? and userphone = ?) or (friendphone = ? and userphone = ?)';
                var value = [data.friendphone, data.userphone, data.userphone, data.friendphone];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    var del_info = result && result.length > 0 ? result : null;

                    if (!del_info) {
                        return callback(null, true, []);
                    }

                    return callback(null, true, del_info);
                });
            }
        ], function (DbErr, isSuccess, uidOrInfo) {
            if (DbErr || !isSuccess) {
                connection.rollback(function () {
                    connection.release();
                });

                return cb(DbErr);
            }

            connection.commit(function (e) {
                if (e) {
                    connection.rollback(function () {
                        connection.release();
                    });

                    return cb(e);
                }

                connection.release();
                cb(null, uidOrInfo);
            });
        });

当客户端用户发送消息就会触发message事件,这时候保存用户聊天信息并返回最新的聊天记录:

代码语言:javascript复制
async.waterfall([
            function (callback) {
                connection.beginTransaction(function (err) {
                    return callback(err);
                });
            },
            function (callback) {
                var sql = 'insert into online_chat set ?';
                var value = {
                    friendphone: data.friendInfo.account,
                    friendname: data.friendInfo.username,
                    friendavatar: data.friendInfo.avatar,
                    app_sid: data.friendInfo.app_sid,
                    userphone: data.userInfo.account,
                    username: data.userInfo.username,
                    useravatar: data.userInfo.avatar,
                    created_date: new Date(),
                    status: 1,
                    content: data.chat_content,
                    chat_type: data.chat_type
                };

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (result.affectedRows == 0) {
                        return callback('聊天出现故障!');
                    }

                    return callback(null, '保存聊天记录成功!');
                });
            },
            function (release_info, callback) {
                var sql = 'select id, friendphone, friendname, friendavatar, app_sid, DATE_FORMAT(created_date, "%Y-%m-%d %H:%i:%s") as created_date, userphone, username, useravatar, content, chat_type from online_chat '  
                    'where (friendphone = ? and userphone = ?) or (friendphone = ? and userphone = ?)';
                var value = [data.friendInfo.account, data.userInfo.account, data.userInfo.account, data.friendInfo.account];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    var del_info = result && result.length > 0 ? result : null;

                    if (!del_info) {
                        return callback(null, true, []);
                    }

                    return callback(null, true, del_info);
                });
            }
        ], function (DbErr, isSuccess, uidOrInfo) {
            if (DbErr || !isSuccess) {
                connection.rollback(function () {
                    connection.release();
                });

                return cb(DbErr);
            }

            connection.commit(function (e) {
                if (e) {
                    connection.rollback(function () {
                        connection.release();
                    });

                    return cb(e);
                }

                connection.release();
                cb(null, uidOrInfo);
            });
        });

到这里我们使用express-ws改写会话系统就完成了,我们可以测试下:

可以发现我们使用websocket可以开启一个长连接成功实现实时会话系统,有消息送达马上接收渲染,而不用像http轮询一样不断地重复请求接口造成贷款和服务器资源的浪费。目前整个项目前后端已开源于码云,欢迎来一个star。源码地址:

https://gitee.com/mqzuimeng_admin/wx_blog.git

0 人点赞