通过上一节的编译与部署,我们会得到TeamTalk服务器端以下部署程序:
db_proxy_server
file_server
http_msg_server
login_server
msfs
msg_server
push_server
router_server
这些服务构成的拓扑图如下:
各个服务程序的作用描述如下:
- LoginServer (C ): 负载均衡服务器,分配一个负载小的MsgServer给客户端使用
- MsgServer (C ): 消息服务器,提供客户端大部分信令处理功能,包括私人聊天、群组聊天等
- RouteServer (C ): 路由服务器,为登录在不同MsgServer的用户提供消息转发功能
- FileServer (C ): 文件服务器,提供客户端之间得文件传输服务,支持在线以及离线文件传输
- MsfsServer (C ): 图片存储服务器,提供头像,图片传输中的图片存储服务
- DBProxy (C ): 数据库代理服务器,提供mysql以及redis的访问服务,屏蔽其他服务器与mysql与redis的直接交互
- HttpMsgServer(C ) :对外接口服务器,提供对外接口功能。(目前只是框架)
- PushServer(C ): 消息推送服务器,提供IOS系统消息推送。(IOS消息推送必须走apns)
注意:上图中并没有push_server和http_push_server。如果你不调试ios版本的客户端,可以暂且不启动push_server,另外http_push_server也可以暂不启动。
启动顺序:
一般来说,前端的服务会依赖后端的服务,所以一般先启动后端服务,再启动前端服务。建议按以下顺序启动服务:
1、启动db_proxy。 2、启动route_server,file_server,msfs 3、启动login_server 4、启动msg_server
那么我就按照服务端的启动顺序去讲解服务端的一个流程概述。 第一步:启动db_proxy后,db_proxy会去根据配置文件连接相应的MySQL实例,以及redis实例。 第二步:启动route_server,file_server,msfs后,各个服务端都会开始监听相应的端口。 第三步:启动login_server,login_server就开始监听相应的端口(8080),等待客户端的连接,而分配一个负载相对较小的msg_server给客户端。 第四步:启动msg_server(端口8000),msg_server启动后,会去主动连接route_server,login_server,db_proxy_server,会将自己的监听的端口信息注册到login_server去,同时在用户上线,下线的时候会将自己的负载情况汇报给login_server.
各个服务的端口号 (注意:如果出现部署完成后但是服务进程启动有问题或者只有部分服务进程启动了,请查看相应的log日志,请查看相应的log日志,请查看相应的log日志。)
服务 | 端口 |
---|---|
login_server | 8080/8008 |
msg_server | 8000 |
db_proxy_server | 10600 |
route_server | 8200 |
http_msg_server | 8400 |
file_server | 8600/8601 |
服务网络通信框架介绍:
上面介绍的每一个服务都使用了相同的网络通信框架,该通信框架可以单独拿出来做为一个通用的网络通信框架。该网络框架是在一个循环里面不断地检测IO事件,然后对检测到的事件进行处理。流程如下:
1. 使用IO复用技术(linux和windows平台用select、mac平台用kevent)分离网络IO。
2. 对分离出来的网络IO进行操作,分为socket句柄可读、可写和出错三种情况。
当然再加上定时器事件,即检测一个定时器事件列表,如果有定时器到期,则执行该定时器事件。
整个框架的伪码大致如下:
[cpp] view plain copy
- while (running)
- {
- //处理定时器事件
- _CheckTimer();
- //IO multiplexing
- int n = select(socket集合, ...);
- //事件处理
- if (某些socket可读)
- {
- pSocket->OnRead();
- }
- if (某些socket可写)
- {
- pSocket->OnWrite();
- }
- if (某些socket出错)
- {
- pSocket->OnClose();
- }
- }
处理定时器事件的代码如下:
[cpp] view plain copy
- void CEventDispatch::_CheckTimer()
- {
- uint64_t curr_tick = get_tick_count();
- list<TimerItem*>::iterator it;
- for (it = m_timer_list.begin(); it != m_timer_list.end(); )
- {
- TimerItem* pItem = *it;
- it ; // iterator maybe deleted in the callback, so we should increment it before callback
- if (curr_tick >= pItem->next_tick)
- {
- pItem->next_tick = pItem->interval;
- pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
- }
- }
- }
即遍历一个定时器列表,将定时器对象与当前时间(curr_tick)做比较,如果当前时间已经大于或等于定时器设置的时间,则表明定时器时间已经到了,执行定时器对象对应的回调函数。
在来看看OnRead、OnWrite和OnClose这三个函数。在TeamTalk源码中每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的类的子类,通过这种方法来实现生存期自动管理。
[cpp] view plain copy
- void CBaseSocket::OnRead()
- {
- if (m_state == SOCKET_STATE_LISTENING)
- {
- _AcceptNewSocket();
- }
- else
- {
- u_long avail = 0;
- if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
- {
- m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
- }
- else
- {
- m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
- }
- }
- }
OnRead()方法根据状态标识m_state确定一个socket是侦听的socket还是普通与客户端连接的socket,如果是侦听sokcet则接收客户端的连接;如果是与客户端连接的socket,则先检测socket上有多少字节可读,如果没有字节可读或者检测字节数时出错,则关闭socket,反之调用设置的回调函数。
[cpp] view plain copy
- void CBaseSocket::OnWrite()
- {
- #if ((defined _WIN32) || (defined __APPLE__))
- CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
- #endif
- if (m_state == SOCKET_STATE_CONNECTING)
- {
- int error = 0;
- socklen_t len = sizeof(error);
- #ifdef _WIN32
- getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
- #else
- getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
- #endif
- if (error) {
- m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
- } else {
- m_state = SOCKET_STATE_CONNECTED;
- m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
- }
- }
- else
- {
- m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
- }
- }
OnWrite()函数则根据m_state标识检测socket是否是尝试连接的socket(connect函数中的socket),用于判断socket是否已经连接成功,反之则是与客户端保持连接的socket,调用预先设置的回调函数。
[cpp] view plain copy
- void CBaseSocket::OnClose()
- {
- m_state = SOCKET_STATE_CLOSING;
- m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
- }
OnClose()方法将标识m_state设置为需要关闭状态,并调用预先设置的回调函数。
每个服务程序都使用一个stl hash_map来管理所有的socket,键是socket句柄,值是CBaseSocket对象指针:
[cpp] view plain copy
- typedef hash_map<net_handle_t, CBaseSocket*> SocketMap;
- SocketMap g_socket_map;
所以在删除或者新增socket时,实际上就是从这个hash_map中删除或者向这个hash_map中增加对象。多线程操作,需要一个锁来进行保护:
[cpp] view plain copy
- void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
- {
- CAutoLock func_lock(&m_lock);
- if ((socket_event & SOCKET_READ) != 0)
- {
- FD_SET(fd, &m_read_set);
- }
- if ((socket_event & SOCKET_WRITE) != 0)
- {
- FD_SET(fd, &m_write_set);
- }
- if ((socket_event & SOCKET_EXCEP) != 0)
- {
- FD_SET(fd, &m_excep_set);
- }
- }
代码CAutoLock func_lock(&m_lock);即保护该hash_map的锁对象。
而管理以上功能的是一个单例类CEventDispatch,所以不难才出CEventDispatch提供的接口:
[cpp] view plain copy
- class CEventDispatch
- {
- public:
- virtual ~CEventDispatch();
- void AddEvent(SOCKET fd, uint8_t socket_event);
- void RemoveEvent(SOCKET fd, uint8_t socket_event);
- void AddTimer(callback_t callback, void* user_data, uint64_t interval);
- void RemoveTimer(callback_t callback, void* user_data);
- void AddLoop(callback_t callback, void* user_data);
- void StartDispatch(uint32_t wait_timeout = 100);
- void StopDispatch();
- bool isRunning() {return running;}
- static CEventDispatch* Instance();
- protected:
- CEventDispatch();
- private:
- void _CheckTimer();
- void _CheckLoop();
- typedef struct {
- callback_t callback;
- void* user_data;
- uint64_t interval;
- uint64_t next_tick;
- } TimerItem;
- private:
- #ifdef _WIN32
- fd_set m_read_set;
- fd_set m_write_set;
- fd_set m_excep_set;
- #elif __APPLE__
- int m_kqfd;
- #else
- int m_epfd;
- #endif
- CLock m_lock;
- list<TimerItem*> m_timer_list;
- list<TimerItem*> m_loop_list;
- static CEventDispatch* m_pEventDispatch;
- bool running;
- };
其中StartDispatch()和StopDispatcher()分别用于启动和停止整个循环流程。一般在程序初始化的时候StartDispatch(),在程序退出时StopDispatcher()。