文章目录
-
- 环境配置与基本知识
- redis.hpp
- redis.cpp
- chatservice修改
-
- 从redis消息队列中获取订阅的消息
环境配置与基本知识
C 搭建集群聊天室(十七):ngnix简介及tcp负载均衡配置
Redis环境搭建与配置
hiredis从安装到实操,一条龙服务
redis事务处理机制,但当涉猎
了解更多 redis 相关知识:我的redis专栏
上面该看的看完了,咱往下可就直接上码啦!!!
这次的改动会有有点大。
redis.hpp
愿意放哪儿放哪儿,我觉着吧,怎么说redis也是个数据库,就放 db 文件夹下吧。
代码语言:javascript复制#ifndef REDIS_H
#define REDIS_H
#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;
class Redis
{
public:
Redis();
~Redis();
// 连接redis服务器
bool connect();
// 向redis指定的通道channel发布消息
bool publish(int channel, string message);
// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);
// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);
// 在独立线程中接收订阅通道中的消息
void observer_channel_message();
// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);
private:
// hiredis同步上下文对象,负责publish消息
redisContext *_publish_context;
// hiredis同步上下文对象,负责subscribe消息
redisContext *_subcribe_context;
// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
};
#endif
redis.cpp
这一套,可以在做轻量级集群服务器间通信用,封装好了的。
代码语言:javascript复制#include "redis.hpp"
#include <iostream>
using namespace std;
Redis::Redis():_publish_context(nullptr), _subcribe_context(nullptr){}
Redis::~Redis(){
if (_publish_context != nullptr){
redisFree(_publish_context);
}
if (_subcribe_context != nullptr){
redisFree(_subcribe_context);
}
}
bool Redis::connect(){
// 负责publish发布消息的上下文连接
_publish_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _publish_context){
cerr << "connect redis failed!" << endl;
return false;
}
// 负责subscribe订阅消息的上下文连接
_subcribe_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _subcribe_context){
cerr << "connect redis failed!" << endl;
return false;
}
// 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
// 向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message){
redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
if (nullptr == reply){
cerr << "publish command failed!" << endl;
return false;
}
freeReplyObject(reply);
return true;
}
// 向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel){
// SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
// 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
// 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){
cerr << "subscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done){
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr << "subscribe command failed!" << endl;
return false;
}
}
// redisGetReply
return true;
}
// 向redis指定的通道unsubscribe取消订阅消息
bool Redis::unsubscribe(int channel){
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){
cerr << "unsubscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done){
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr << "unsubscribe command failed!" << endl;
return false;
}
}
return true;
}
// 在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){
// 订阅收到的消息是一个带三元素的数组
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
// 给业务层上报通道上发生的消息
_notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
void Redis::init_notify_handler(function<void(int,string)> fn){
this->_notify_message_handler = fn;
}
chatservice修改
头文件里面自行修改吧,这里放出源文件的修改范围。
构造函数中连接上redis:
代码语言:javascript复制// 连接redis服务器
if (_redis.connect()){
// 设置上报消息的回调
_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));
}
登录成功后,向redis消息队列进行订阅:
代码语言:javascript复制 // id用户登录成功后,向redis订阅channel(id)
_redis.subscribe(id);
用户注销之后,取消订阅:
代码语言:javascript复制 // 用户注销,相当于就是下线,在redis中取消订阅通道
_redis.unsubscribe(userid);
(客户端里以外掉线也给它来上这么一下)
单聊:
代码语言:javascript复制//一对一聊天
void ChatService::onechat(const TcpConnectionPtr &conn,json &js,Timestamp time){
// cout<<js<<endl;
int toid = js["toid"].get<int>(); //这里bug
// bool userstate = false;
//开辟锁的作用域
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(toid);
if(it != _userConnMap.end()){
//用户在线,转发消息
it->second->send(js.dump());
return;
}
}
// 查询toid是否在线
User user = _usermodel.query(toid);
if (user.getstate() == "online"){
_redis.publish(toid, js.dump());
return;
}
// toid不在线,存储离线消息
_offlineMsgmodel.insert(toid, js.dump());
}
群聊:
代码语言:javascript复制// 群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid);
lock_guard<mutex> lock(_connMutex);
for (int id : useridVec){
auto it = _userConnMap.find(id);
if (it != _userConnMap.end()){
// 转发群消息
it->second->send(js.dump());
}
else{
User user = _usermodel.query(id);
if (user.getstate() == "online"){
_redis.publish(id, js.dump());
}
else{
// 存储离线群消息
_offlineMsgmodel.insert(id, js.dump());
}
}
}
}
从redis消息队列中获取订阅的消息
代码语言:javascript复制void ChatService::handleRedisSubscribeMessage(int userid, string msg){
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end())
{
it->second->send(msg);
return;
}
// 存储该用户的离线消息
_offlineMsgmodel.insert(userid, msg);
}