本文介绍了如何用UDP创建一个简单的聊天室。
一. 服务端模块实现
服务端仍然沿用我们前面的思想(高内聚低耦合),因此我们用一下上一篇UDP英译汉网络词典的服务端实现(点此查看)。
代码语言:javascript复制#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <strings.h>
#include <stdlib.h>
#include<functional>
#include "Log.hpp"
#include"InetAddr.hpp"
#include"Dict.hpp"
using namespace std;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
USAGE_ERROR
};
const static int defaultfd = -1;
using func_t=function<string(const string&,bool& ok)>;
class UdpServer
{
public:
UdpServer(uint16_t port,func_t func)
: _sockfd(defaultfd), _port(port), _func(func)
,_isrunning(false)
{}
void InitServer()
{
// 1.创建udp socket 套接字...必须要做的
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket error,%s,%dn", strerror(errno), errno);
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success,sockfd: %dn", _sockfd);
// 2.1 填充sockaddr_in结构
struct sockaddr_in local; // struct sockaddr_in 系统提供的数据类型,local是变量,用户栈上开辟空间
bzero(&local, sizeof(local)); // 清空
local.sin_family = AF_INET;
local.sin_port = htons(_port); // port要经过网络传输给对面,即port先到网络,所以要将_port,从主机序列转化为网络序列
local.sin_addr.s_addr=INADDR_ANY;//htonl(INADDR_ANY)
// 2.2 bind sockfd和网络信息(IP(?) Port)
int n = bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL, "bind error,%s,%dn", strerror(errno), errno);
exit(BIND_ERROR);
}
LOG(INFO, "socket bind successn");
}
void Start()//所有的服务器,本质解决的是输入输出的问题!不想让网络通信模块和业务模块进行强耦合
{}
~UdpServer()
{
}
private:
int _sockfd;
uint16_t _port; // 服务器所用的端口号
bool _isrunning;
//给服务器设定回调,用来让上层进行注册业务的处理方法
func_t _func;
};
首先明确的是,初始化函数InitServer是不变的,我们再来看Start函数,也是大差不差,只需改动一捏捏,我们也可以仿照以前的思路让上层去实现这个聊天的功能,那么我们就知道了,这次的服务端也需要一个回调函数,让上层进行业务处理。我们稍作修改。
代码语言:javascript复制using handler_message_t=......
我们先定义出来处理业务的函数类型,参数部分留待下面解析。
那么我们的TcpServer类的类成员就变成了:
代码语言:javascript复制class UdpServer
{
private:
int _sockfd;
//string _ip;//不是必须的
uint16_t _port; // 服务器所用的端口号
bool _isrunning;
//给服务器设定回调,用来让上层进行注册业务的处理方法
handler_message_t _handler_message;
};
由此来编写构造函数,以及Start函数就显得水到渠成了。
代码语言:javascript复制const static int defaultfd = -1;
using handler_message_t=......
class UdpServer
{
public:
UdpServer(uint16_t port,handler_message_t handler_message)
: _sockfd(defaultfd), _port(port), _handler_message(handler_message)
,_isrunning(false)
{}
void Start()//所有的服务器,本质解决的是输入输出的问题!不想让网络通信模块和业务模块进行强耦合
{
//一直运行,直到管理者不想运行了,服务器都是死循环
_isrunning=true;
while(true)
{
char message[1024];
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
//1.我们要让server先收数据
ssize_t n=recvfrom(_sockfd,message,sizeof(message)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
message[n]=0;
InetAddr addr(peer);
LOG(DEBUG,"get message from [%s:%d]: %sn",addr.Ip().c_str(),addr.Port(),message);
_handler_message(_sockfd,message,addr);
}
}
_isrunning=false;
}
~UdpServer()
{}
private:
int _sockfd;
//string _ip;//不是必须的
uint16_t _port; // 服务器所用的端口号
bool _isrunning;
//给服务器设定回调,用来让上层进行注册业务的处理方法
handler_message_t _handler_message;
};
那好我们下面就具体来看看该如何处理业务,以补充服务端的处理方法。
二. 处理聊天消息模块实现
大家不用猜也知道该怎么办了吧。没错,仍然封装成一个类。
来看看基本框架如何写。
代码语言:javascript复制class MessageRoute
{
public:
MessageRoute()
{
pthread_mutex_init(&_mutex,nullptr);
}
~MessageRoute()
{
pthread_mutex_destroy(&_mutex);
}
private:
vector<InetAddr> _online_user;
pthread_mutex_t _mutex;
};
我们的成员有两位,首先我们想想平时我的微信、QQ,聊天的话肯定不止一个人聊天,我不聊天但是别人的消息仍然能显示到我的屏幕。所以定义一个vector结构的数组用来装聊天成员。再定义一个锁来保护临界资源,更加安全。
第一次看的朋友,可能不知道vector里面装的InetAddr是什么,其实是我们封装的一个类。
代码语言:javascript复制class InetAddr
{
private:
void GetAddress(string* ip,uint16_t* port)
{
*port=ntohs(_addr.sin_port);
*ip=inet_ntoa(_addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr)
:_addr(addr)
{
GetAddress(&_ip,&_port);
}
string Ip()
{
return _ip;
}
bool operator==(const InetAddr& addr)
{
//if(_ip==addr._ip) 任何时刻只允许一个用户
if(_ip==addr._ip && _port==addr._port)//方便测试
{
return true;
}
return false;
}
struct sockaddr_in Addr()
{
return _addr;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{}
private:
struct sockaddr_in _addr;
string _ip;
uint16_t _port;
};
这样封装更便于我们的操作。
当有新用户进入聊天室进行聊天的时候,我们应该将其插入到用户数组中,而当由用户退出的时候,我们同样应该及时的将其从数组中删除。
代码语言:javascript复制bool IsExists(const InetAddr& addr)
{
for(auto a:_online_user)
{
if(a==addr) return true;
}
return false;
}
void AddUser(const InetAddr& addr)
{
LockGuard lockguard(&_mutex);
if(IsExists(addr)) return;
_online_user.push_back(addr);
}
void DelUser(const InetAddr& addr)
{
LockGuard lockguard(&_mutex);
for(auto iter=_online_user.begin();iter!=_online_user.end();iter )
{
if(*iter==addr)
{
_online_user.erase(iter);
break;
}
}
}
这里出现了一个新东西----LockGuard,这是我们按照RAII(点此查看)的思路封装的锁。
代码语言:javascript复制#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__
#include<iostream>
#include<pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);//构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
#endif
那么正式来说该如何处理消息呢?
代码语言:javascript复制void RouteHelper(int sockfd,string message,InetAddr who)
{
LockGuard lockguard(&_mutex);
//2.进行消息转发
for(auto user:_online_user)
{
string send_message="n[" who.Ip() ":" to_string(who.Port()) "]#" message "n";
struct sockaddr_in clientaddr=user.Addr();
::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&clientaddr,sizeof(clientaddr));
}
}
void Route(int sockfd,string message,InetAddr who)
{
//1.1 我们任务:用户首次发消息,还要将自己,插入到在线用户中
AddUser(who);
//1.2 如果客户端要退出
if(message=="Q" || message=="QUIT")
{
DelUser(who);
}
//2.构建任务对象,入队列,让线程池进行转发
task_t t=bind(&MessageRoute::RouteHelper,this,sockfd,message,who);
ThreadPool<task_t>::GetInstance()->Enqueue(t);
}
我们来说说逻辑,处理方法就是将发来的消息通过线程池进行转发。
代码语言:javascript复制#pragma once
//单例模式的线程池
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace std;
using namespace ThreadModule;
const static int gdefaultthreadnum=3;
template<typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond,&_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeAll()
{
pthread_cond_broadcast(&_cond);
}
//私有的
ThreadPool(int threadnum=gdefaultthreadnum)
:_threadnum(threadnum)
,_waitnum(0)
,_isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
LOG(INFO,"ThreadPool Construct()");
}
void Start()
{
for(auto& thread:_threads)
{
thread.Start();
}
}
void HandlerTask(string name)//类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用
{
LOG(INFO,"%s is runningn",name.c_str());
while(true)
{
//1.保证队列安全
LockQueue();
//2.队列中不一定有数据
while(_task_queue.empty() && _isrunning)
{
_waitnum ;
ThreadSleep();
_waitnum--;
}
//2.1 如果线程池已经退出了 && 任务队列是空的
if(_task_queue.empty() && !_isrunning)
{
UnLockQueue();
break;
}
//2.2 如果线程池不退出 && 任务队列不是空的
//2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后再退出
//3.一定有任务,处理任务
T t=_task_queue.front();
_task_queue.pop();
UnLockQueue();
LOG(DEBUG,"%s get a task",name.c_str());
//4.处理任务,这个任务属于线程独占的任务,所以不能放在加锁和解锁之间
t();
//LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());
}
}
void InitThreadPool()
{
//指向构建出所有的线程,并不自动
for(int num=0;num<_threadnum;num )
{
string name="thread-" to_string(num 1);
_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
LOG(INFO,"init thread %s donen",name.c_str());
}
_isrunning=true;
}
//复制拷贝禁用
ThreadPool<T> &operator=(const ThreadPool<T>&)=delete;
ThreadPool(const ThreadPool<T> &)=delete;
public:
static ThreadPool<T> *GetInstance()
{
//如果是多线程获取线程池对象,下面的代码就有问题,所以要加锁
//双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
if(_instance==nullptr)//只有第一次会创建对象,后续都是获取,这样就不用每次都申请锁
{//保证第二次之后,所有线程,不用再加锁,直接返回_instance单例对象
LockGuard lockguard(&_lock);
if (_instance == nullptr)
{
_instance = new ThreadPool<T>();
_instance->InitThreadPool();
_instance->Start();
LOG(DEBUG, "创建线程池单例n");
return _instance;
}
}
LOG(DEBUG, "获取线程池单例n");
return _instance;
}
bool Enqueue(const T& t)
{
bool ret=false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
LOG(DEBUG,"enqueue task successn");
ret=true;
}
UnLockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning=false;
ThreadWakeAll();
UnLockQueue();
}
void Wait()
{
for(auto& thread:_threads)
{
thread.Join();
LOG(INFO,"%s is quit",thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
vector<Thread> _threads;
queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning;
//添加单例模式--懒汉
static ThreadPool<T> *_instance;
static pthread_mutex_t _lock;//保护单例的锁
};
template<typename T>
ThreadPool<T> *ThreadPool<T>::_instance=nullptr;
template<typename T>
pthread_mutex_t ThreadPool<T>::_lock=PTHREAD_MUTEX_INITIALIZER;
我们可以知道,Route函数就是我们之前在服务器说的上层处理函数。那么handler_message_t类型的上层处理函数的参数就很明确了。
代码语言:javascript复制using handler_message_t=function<void(int sockfd,const string message,const InetAddr who)>;
那么调用服务端的主函数如何写就很明确了。
此处我们还封装了原生线程库,命名文件为Thread.hpp。
代码语言:javascript复制//封装原生线程库
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<string>
#include<functional>
#include<unistd.h>
#include<pthread.h>
using namespace std;
namespace ThreadModule
{
using func_t=function<void(string&)>;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func,const string& name="none-name")
:_func(func)
,_threadname(name)
,_stop(true)
{}
static void* threadroutine(void* args)//类成员函数,形参是有this指针的!
{
Thread *self=static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n=pthread_create(&_tid,nullptr,threadroutine,this);
if(!n)
{
_stop=false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
string name()
{
return _threadname;
}
void Stop()
{
_stop=true;
}
~Thread(){}
private:
pthread_t _tid;
string _threadname;
func_t _func;
bool _stop;
};
}
#endif
三. 调用服务端模块实现
我们只需将服务端中处理业务函数初始化为处理业务模块中的Route函数,然后依次调用InitServer函数、Start函数即可。
代码语言:javascript复制#include<iostream>
#include<memory>
#include"UdpServer.hpp"
#include"Log.hpp"
#include"MessageRoute.hpp"
using namespace std;
void Usage(string proc)
{
cout<<"Usage:nt"<<proc<<" local_portn"<<endl;
}
// ./udpserver ip
int main(int argc,char *argv[])
{
if(argc!=2)
{
Usage(argv[0]);
exit(USAGE_ERROR);
}
EnableScreen();
//string ip=argv[1];
//定义消息转发模块
MessageRoute route;
//网络模块
uint16_t port=stoi(argv[1]);
unique_ptr<UdpServer> usvr=make_unique<UdpServer>(port,
bind(&MessageRoute::Route,&route,placeholders::_1,
placeholders::_2,placeholders::_3));//C 14
usvr->InitServer();
usvr->Start();
return 0;
}
MessageRoute.hpp文件即我们的处理聊天消息模块。
四. 客户端模块实现
此处虽说大体还是发送消息,并接收服务器发送回来的消息。
但是与众不同的是:此处发送消息和接收服务器发送回来的消息应该是两个不同的线程。因为要做到不发消息的时候还是能接收到消息。
代码语言:javascript复制#include<iostream>
#include<string>
#include<cstdio>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"Thread.hpp"
#include"Comm.hpp"
using namespace std;
using namespace ThreadModule;
void recvmessage(int sockfd,string name)
{
//version 1
int fd=OpenDev("/dev/pts/0",O_WRONLY);
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
char buffer[1024];
ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
buffer[n]=0;
write(fd,buffer,strlen(buffer));
}
}
//version 2
// while(true)
// {
// struct sockaddr_in peer;
// socklen_t len=sizeof(peer);
// char buffer[1024];
// ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len);
// if(n>0)
// {
// buffer[n]=0;
// fprintf(stderr,"%s | %sn",name.c_str(),buffer);
// //此时运行指令变为./udpclient ip port 2>/dev/pts/2
// }
// }
}
void sendmessage(int sockfd,struct sockaddr_in& server,string name)
{
string message;
while(true)
{
printf("%s | Enter# ",name.c_str());
fflush(stdout);
getline(cin,message);
sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&server,sizeof(server));
}
}
void Usage(string proc)
{
cout<<"Usage:nt"<<proc<<" serverip serverportn"<<endl;
}
int InitClient(string& serverip,uint16_t serverport,struct sockaddr_in *server)
{
//1.创建socket
int sockfd=socket(AF_INET,SOCK_DGRAM,0);
if(sockfd<0)
{
cerr<<"socket error"<<endl;
return -1;
}
//2.client一定要bind,client也有自己的ip和port,但是不建议显示(和server一样用bind函数)bind
//a.那如何bind呢?当udp client首次发送数据的时候,os会自动随机的给client进行bind--为什么?要bind,必然要和port关联!防止client port冲突
//b.什么时候bind?首次发送数据的时候
//构建目标主机的socket信息
memset(server,0,sizeof(struct sockaddr_in));
server->sin_family=AF_INET;
server->sin_port=htons(serverport);
server->sin_addr.s_addr=inet_addr(serverip.c_str());
return sockfd;
}
// ./udpclient serverip serverport
int main(int argc,char *argv[])
{
if(argc!=3)
{
Usage(argv[0]);
exit(1);
}
string serverip=argv[1];
uint16_t serverport=stoi(argv[2]);
struct sockaddr_in serveraddr;
int sockfd=InitClient(serverip,serverport,&serveraddr);
if(sockfd==-1) return 1;
func_t r=bind(&recvmessage,sockfd,placeholders::_1);
func_t s=bind(&sendmessage,sockfd,serveraddr,placeholders::_1);
//创建两个线程,分别用来接收消息和发消息,使其两个互不受影响
Thread Recver(r,"recver");//recver在前面,还是sender在前面,都行
Thread Sender(s,"sender");
Sender.Start();
Recver.Start();
Recver.Join();
Sender.Join();
return 0;
}
同样用的是自己封装的线程。
值得注意的是这里接收消息模块有两个版本。此处的终端文件(/dev/pts)可以根据自己实际情况修改。
五. 效果展示
分别来看看两个版本都是怎么样的吧。
version 1:
version 2:
总结:
好了,到这里今天的知识就讲完了,大家有错误一点要在评论指出,我怕我一人搁这瞎bb,没人告诉我错误就寄了。