(三) 服务器端的程序架构介绍2

2018-04-04 15:18:28 浏览数 (1)

下面我们以pc端登录为例来具体看一个数据包在服务器端各个服务之间走过的流程:

步骤1:login_server初始化侦听socket,设置新连接到来的回调函数。8080端口,该端口是为http服务配置的。

在login_server.cpp main函数中调用:

netlib_listen调用如下:

pSocket->Listen调用:

AddBaseSocket将该socket加入hash_map中。AddEvent设置需要关注的socket上的事件,这里只关注可读和出错事件。

步骤2: 客户端调用connect()函数连接login_server的8080端口。

步骤3:login_server收到连接请求后调用OnRead方法,OnRead()方法里面调用_AcceptNewSocket(),_AcceptNewSocket()接收新连接,创建新的socket,并调用之前初始化阶段netlib_listen设置的回调函数http_callback。

[cpp] view plain copy

  1. void CBaseSocket::OnRead()
  2. {
  3. if (m_state == SOCKET_STATE_LISTENING)
  4. {
  5. _AcceptNewSocket();
  6. }
  7. else
  8. {
  9. u_long avail = 0;
  10. if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
  11. {
  12. m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
  13. }
  14. else
  15. {
  16. m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
  17. }
  18. }
  19. }

[cpp] view plain copy

  1. void CBaseSocket::_AcceptNewSocket()
  2. {
  3. SOCKET fd = 0;
  4. sockaddr_in peer_addr;
  5. socklen_t addr_len = sizeof(sockaddr_in);
  6. char ip_str[64];
  7. while ( (fd = accept(m_socket, (sockaddr*)&peer_addr, &addr_len)) != INVALID_SOCKET )
  8. {
  9. CBaseSocket* pSocket = new CBaseSocket();
  10. uint32_t ip = ntohl(peer_addr.sin_addr.s_addr);
  11. uint16_t port = ntohs(peer_addr.sin_port);
  12. snprintf(ip_str, sizeof(ip_str), "%d.%d.%d.%d", ip >> 24, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF);
  13. log("AcceptNewSocket, socket=%d from %s:%dn", fd, ip_str, port);
  14. pSocket->SetSocket(fd);
  15. pSocket->SetCallback(m_callback);
  16. pSocket->SetCallbackData(m_callback_data);
  17. pSocket->SetState(SOCKET_STATE_CONNECTED);
  18. pSocket->SetRemoteIP(ip_str);
  19. pSocket->SetRemotePort(port);
  20. _SetNoDelay(fd);
  21. _SetNonblock(fd);
  22. AddBaseSocket(pSocket);
  23. CEventDispatch::Instance()->AddEvent(fd, SOCKET_READ | SOCKET_EXCEP);
  24. m_callback(m_callback_data, NETLIB_MSG_CONNECT, (net_handle_t)fd, NULL);
  25. }
  26. }

[cpp] view plain copy

  1. void http_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
  2. {
  3. if (msg == NETLIB_MSG_CONNECT)
  4. {
  5. CHttpConn* pConn = new CHttpConn();
  6. pConn->OnConnect(handle);
  7. }
  8. else
  9. {
  10. log("!!!error msg: %d ", msg);
  11. }
  12. }

pConn->OnConnect(handle)中设置http数据的回调函数httpconn_callback:

[cpp] view plain copy

  1. void CHttpConn::OnConnect(net_handle_t handle)
  2. {
  3. printf("OnConnect, handle=%dn", handle);
  4. m_sock_handle = handle;
  5. m_state = CONN_STATE_CONNECTED;
  6. g_http_conn_map.insert(make_pair(m_conn_handle, this));
  7. netlib_option(handle, NETLIB_OPT_SET_CALLBACK, (void*)httpconn_callback);
  8. netlib_option(handle, NETLIB_OPT_SET_CALLBACK_DATA, reinterpret_cast<void *>(m_conn_handle) );
  9. netlib_option(handle, NETLIB_OPT_GET_REMOTE_IP, (void*)&m_peer_ip);
  10. }

httpconn_callback中处理http可读可写出错事件:

[cpp] view plain copy

  1. void httpconn_callback(void* callback_data, uint8_t msg, uint32_t handle, uint32_t uParam, void* pParam)
  2. {
  3. NOTUSED_ARG(uParam);
  4. NOTUSED_ARG(pParam);
  5. // convert void* to uint32_t, oops
  6. uint32_t conn_handle = *((uint32_t*)(&callback_data));
  7. CHttpConn* pConn = FindHttpConnByHandle(conn_handle);
  8. if (!pConn) {
  9. return;
  10. }
  11. switch (msg)
  12. {
  13. case NETLIB_MSG_READ:
  14. pConn->OnRead();
  15. break;
  16. case NETLIB_MSG_WRITE:
  17. pConn->OnWrite();
  18. break;
  19. case NETLIB_MSG_CLOSE:
  20. pConn->OnClose();
  21. break;
  22. default:
  23. log("!!!httpconn_callback error msg: %d ", msg);
  24. break;
  25. }
  26. }

步骤4:客户端连接成功以后,发送http请求,方法是get,请求url:http://192.168.226.128:8080/msg_server。(具体网址与你的机器配置的网址有关)

步骤5:login_server检测到该socket可读,调用pConn->OnRead()方法。

[cpp] view plain copy

  1. void CHttpConn::OnRead()
  2. {
  3. for (;;)
  4. {
  5. uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
  6. if (free_buf_len < READ_BUF_SIZE 1)
  7. m_in_buf.Extend(READ_BUF_SIZE 1);
  8. int ret = netlib_recv(m_sock_handle, m_in_buf.GetBuffer() m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
  9. if (ret <= 0)
  10. break;
  11. m_in_buf.IncWriteOffset(ret);
  12. m_last_recv_tick = get_tick_count();
  13. }
  14. // 每次请求对应一个HTTP连接,所以读完数据后,不用在同一个连接里面准备读取下个请求
  15. char* in_buf = (char*)m_in_buf.GetBuffer();
  16. uint32_t buf_len = m_in_buf.GetWriteOffset();
  17. in_buf[buf_len] = '';
  18. // 如果buf_len 过长可能是受到攻击,则断开连接
  19. // 正常的url最大长度为2048,我们接受的所有数据长度不得大于1K
  20. if(buf_len > 1024)
  21. {
  22. log("get too much data:%s ", in_buf);
  23. Close();
  24. return;
  25. }
  26. //log("OnRead, buf_len=%u, conn_handle=%un", buf_len, m_conn_handle); // for debug
  27. m_cHttpParser.ParseHttpContent(in_buf, buf_len);
  28. if (m_cHttpParser.IsReadAll()) {
  29. string url = m_cHttpParser.GetUrl();
  30. if (strncmp(url.c_str(), "/msg_server", 11) == 0) {
  31. string content = m_cHttpParser.GetBodyContent();
  32. _HandleMsgServRequest(url, content);
  33. } else {
  34. log("url unknown, url=%s ", url.c_str());
  35. Close();
  36. }
  37. }
  38. }

CHttpConn::OnRead()先用recv收取数据,接着解析数据,如果出错或者非法数据就关闭连接。如果客户端发送的请求的http object正好是/msg_server,则调用_HandleMsgServRequest(url, content);进行处理:

[cpp] view plain copy

  1. void CHttpConn::_HandleMsgServRequest(string& url, string& post_data)
  2. {
  3. msg_serv_info_t* pMsgServInfo;
  4. uint32_t min_user_cnt = (uint32_t)-1;
  5. map<uint32_t, msg_serv_info_t*>::iterator it_min_conn = g_msg_serv_info.end();
  6. map<uint32_t, msg_serv_info_t*>::iterator it;
  7. if(g_msg_serv_info.size() <= 0)
  8. {
  9. Json::Value value;
  10. value["code"] = 1;
  11. value["msg"] = "没有msg_server";
  12. string strContent = value.toStyledString();
  13. char* szContent = new char[HTTP_RESPONSE_HTML_MAX];
  14. snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, strContent.length(), strContent.c_str());
  15. Send((void*)szContent, strlen(szContent));
  16. delete [] szContent;
  17. return ;
  18. }
  19. for (it = g_msg_serv_info.begin() ; it != g_msg_serv_info.end(); it ) {
  20. pMsgServInfo = it->second;
  21. if ( (pMsgServInfo->cur_conn_cnt < pMsgServInfo->max_conn_cnt) &&
  22. (pMsgServInfo->cur_conn_cnt < min_user_cnt)) {
  23. it_min_conn = it;
  24. min_user_cnt = pMsgServInfo->cur_conn_cnt;
  25. }
  26. }
  27. if (it_min_conn == g_msg_serv_info.end()) {
  28. log("All TCP MsgServer are full ");
  29. Json::Value value;
  30. value["code"] = 2;
  31. value["msg"] = "负载过高";
  32. string strContent = value.toStyledString();
  33. char* szContent = new char[HTTP_RESPONSE_HTML_MAX];
  34. snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, strContent.length(), strContent.c_str());
  35. Send((void*)szContent, strlen(szContent));
  36. delete [] szContent;
  37. return;
  38. } else {
  39. Json::Value value;
  40. value["code"] = 0;
  41. value["msg"] = "";
  42. if(pIpParser->isTelcome(GetPeerIP()))
  43. {
  44. value["priorIP"] = string(it_min_conn->second->ip_addr1);
  45. value["backupIP"] = string(it_min_conn->second->ip_addr2);
  46. value["msfsPrior"] = strMsfsUrl;
  47. value["msfsBackup"] = strMsfsUrl;
  48. }
  49. else
  50. {
  51. value["priorIP"] = string(it_min_conn->second->ip_addr2);
  52. value["backupIP"] = string(it_min_conn->second->ip_addr1);
  53. value["msfsPrior"] = strMsfsUrl;
  54. value["msfsBackup"] = strMsfsUrl;
  55. }
  56. value["discovery"] = strDiscovery;
  57. value["port"] = int2string(it_min_conn->second->port);
  58. string strContent = value.toStyledString();
  59. char* szContent = new char[HTTP_RESPONSE_HTML_MAX];
  60. uint32_t nLen = strContent.length();
  61. snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, nLen, strContent.c_str());
  62. Send((void*)szContent, strlen(szContent));
  63. delete [] szContent;
  64. return;
  65. }
  66. }

该方法根据客户端ip地址将msg_server的地址组装成json格式,返回给客户端。json格式内容如下:

[plain] view plain copy

  1. {
  2. "backupIP" : "localhost",
  3. "code" : 0,
  4. "discovery" : "http://192.168.226.128/api/discovery",
  5. "msfsBackup" : "http://192.168.226.128:8700/",
  6. "msfsPrior" : "http://192.168.226.128:8700/",
  7. "msg" : "",
  8. "port" : "8000",
  9. "priorIP" : "localhost"
  10. }

注意,发送数据给客户端调用的是Send方法,该方法会先尝试着调用底层的send()函数去发送,如果不能全部发送出去,则将剩余数据加入到对应的写数据缓冲区内。这样这些数据会在该socket可写时再继续发送。这是也是设计网络通信库一个通用的技巧,即先试着去send,如果send不了,将数据放入待发送缓冲区内,并设置检测可写标识位,当socket可写时,从待发送缓冲区取出数据发送出去。如果还是不能全部发送出去,继续设置检测可写标识位,下次再次发送,如此循环一直到所有数据都发送出去为止。

[cpp] view plain copy

  1. int CHttpConn::Send(void* data, int len)
  2. {
  3. m_last_send_tick = get_tick_count();
  4. if (m_busy)
  5. {
  6. m_out_buf.Write(data, len);
  7. return len;
  8. }
  9. int ret = netlib_send(m_sock_handle, data, len);
  10. if (ret < 0)
  11. ret = 0;
  12. if (ret < len)
  13. {
  14. m_out_buf.Write((char*)data ret, len - ret);
  15. m_busy = true;
  16. //log("not send all, remain=%dn", m_out_buf.GetWriteOffset());
  17. }
  18. else
  19. {
  20. OnWriteComlete();
  21. }
  22. return len;
  23. }

当然,由于这里http设置成了短连接,每次应答完客户度之后立即关闭连接,在OnWriteComplete()里面:

[cpp] view plain copy

  1. void CHttpConn::OnWriteComlete()
  2. {
  3. log("write complete ");
  4. Close();
  5. }

步骤6:客户端收到http请求的应答后,根据收到的json得到msg_server的ip地址,这里是ip地址是192.168.226.128,端口号是8000。客户端开始连接这个ip地址和端口号,连接过程与msg_server接收连接过程与上面的步骤相同。接着客户端给服务器发送登录数据包。

步骤7:msg_server收到登录请求后,在CImConn::OnRead()收取数据,解包,调用子类CMsgConn重写的HandlePdu,处理登录请求,如何处理呢?处理如下:

[cpp] view plain copy

  1. //MsgConn.cpp
  2. void CMsgConn::HandlePdu(CImPdu* pPdu)
  3. {
  4. // request authorization check
  5. if (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {
  6. log("HandlePdu, wrong msg. ");
  7. throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");
  8. return;
  9. }
  10. switch (pPdu->GetCommandId()) {
  11. case CID_OTHER_HEARTBEAT:
  12. _HandleHeartBeat(pPdu);
  13. break;
  14. case CID_LOGIN_REQ_USERLOGIN:
  15. _HandleLoginRequest(pPdu );
  16. break;
  17. case CID_LOGIN_REQ_LOGINOUT:
  18. _HandleLoginOutRequest(pPdu);
  19. break;
  20. case CID_LOGIN_REQ_DEVICETOKEN:
  21. _HandleClientDeviceToken(pPdu);
  22. break;
  23. case CID_LOGIN_REQ_KICKPCCLIENT:
  24. _HandleKickPCClient(pPdu);
  25. break;
  26. case CID_LOGIN_REQ_PUSH_SHIELD:
  27. _HandlePushShieldRequest(pPdu);
  28. break;
  29. case CID_LOGIN_REQ_QUERY_PUSH_SHIELD:
  30. _HandleQueryPushShieldRequest(pPdu);
  31. break;
  32. case CID_MSG_DATA:
  33. _HandleClientMsgData(pPdu);
  34. break;
  35. case CID_MSG_DATA_ACK:
  36. _HandleClientMsgDataAck(pPdu);
  37. break;
  38. case CID_MSG_TIME_REQUEST:
  39. _HandleClientTimeRequest(pPdu);
  40. break;
  41. case CID_MSG_LIST_REQUEST:
  42. _HandleClientGetMsgListRequest(pPdu);
  43. break;
  44. case CID_MSG_GET_BY_MSG_ID_REQ:
  45. _HandleClientGetMsgByMsgIdRequest(pPdu);
  46. break;
  47. case CID_MSG_UNREAD_CNT_REQUEST:
  48. _HandleClientUnreadMsgCntRequest(pPdu );
  49. break;
  50. case CID_MSG_READ_ACK:
  51. _HandleClientMsgReadAck(pPdu);
  52. break;
  53. case CID_MSG_GET_LATEST_MSG_ID_REQ:
  54. _HandleClientGetLatestMsgIDReq(pPdu);
  55. break;
  56. case CID_SWITCH_P2P_CMD:
  57. _HandleClientP2PCmdMsg(pPdu );
  58. break;
  59. case CID_BUDDY_LIST_RECENT_CONTACT_SESSION_REQUEST:
  60. _HandleClientRecentContactSessionRequest(pPdu);
  61. break;
  62. case CID_BUDDY_LIST_USER_INFO_REQUEST:
  63. _HandleClientUserInfoRequest( pPdu );
  64. break;
  65. case CID_BUDDY_LIST_REMOVE_SESSION_REQ:
  66. _HandleClientRemoveSessionRequest( pPdu );
  67. break;
  68. case CID_BUDDY_LIST_ALL_USER_REQUEST:
  69. _HandleClientAllUserRequest(pPdu );
  70. break;
  71. case CID_BUDDY_LIST_CHANGE_AVATAR_REQUEST:
  72. _HandleChangeAvatarRequest(pPdu);
  73. break;
  74. case CID_BUDDY_LIST_CHANGE_SIGN_INFO_REQUEST:
  75. _HandleChangeSignInfoRequest(pPdu);
  76. break;
  77. case CID_BUDDY_LIST_USERS_STATUS_REQUEST:
  78. _HandleClientUsersStatusRequest(pPdu);
  79. break;
  80. case CID_BUDDY_LIST_DEPARTMENT_REQUEST:
  81. _HandleClientDepartmentRequest(pPdu);
  82. break;
  83. // for group process
  84. case CID_GROUP_NORMAL_LIST_REQUEST:
  85. s_group_chat->HandleClientGroupNormalRequest(pPdu, this);
  86. break;
  87. case CID_GROUP_INFO_REQUEST:
  88. s_group_chat->HandleClientGroupInfoRequest(pPdu, this);
  89. break;
  90. case CID_GROUP_CREATE_REQUEST:
  91. s_group_chat->HandleClientGroupCreateRequest(pPdu, this);
  92. break;
  93. case CID_GROUP_CHANGE_MEMBER_REQUEST:
  94. s_group_chat->HandleClientGroupChangeMemberRequest(pPdu, this);
  95. break;
  96. case CID_GROUP_SHIELD_GROUP_REQUEST:
  97. s_group_chat->HandleClientGroupShieldGroupRequest(pPdu, this);
  98. break;
  99. case CID_FILE_REQUEST:
  100. s_file_handler->HandleClientFileRequest(this, pPdu);
  101. break;
  102. case CID_FILE_HAS_OFFLINE_REQ:
  103. s_file_handler->HandleClientFileHasOfflineReq(this, pPdu);
  104. break;
  105. case CID_FILE_ADD_OFFLINE_REQ:
  106. s_file_handler->HandleClientFileAddOfflineReq(this, pPdu);
  107. break;
  108. case CID_FILE_DEL_OFFLINE_REQ:
  109. s_file_handler->HandleClientFileDelOfflineReq(this, pPdu);
  110. break;
  111. default:
  112. log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());
  113. break;
  114. }
  115. }

分支case CID_LOGIN_REQ_USERLOGIN即处理登录请求:

[cpp] view plain copy

  1. //在MsgConn.cpp中
  2. void CMsgConn::_HandleLoginRequest(CImPdu* pPdu)
  3. {
  4. // refuse second validate request
  5. if (m_login_name.length() != 0) {
  6. log("duplicate LoginRequest in the same conn ");
  7. return;
  8. }
  9. // check if all server connection are OK
  10. uint32_t result = 0;
  11. string result_string = "";
  12. CDBServConn* pDbConn = get_db_serv_conn_for_login();
  13. if (!pDbConn) {
  14. result = IM::BaseDefine::REFUSE_REASON_NO_DB_SERVER;
  15. result_string = "服务端异常";
  16. }
  17. else if (!is_login_server_available()) {
  18. result = IM::BaseDefine::REFUSE_REASON_NO_LOGIN_SERVER;
  19. result_string = "服务端异常";
  20. }
  21. else if (!is_route_server_available()) {
  22. result = IM::BaseDefine::REFUSE_REASON_NO_ROUTE_SERVER;
  23. result_string = "服务端异常";
  24. }
  25. if (result) {
  26. IM::Login::IMLoginRes msg;
  27. msg.set_server_time(time(NULL));
  28. msg.set_result_code((IM::BaseDefine::ResultType)result);
  29. msg.set_result_string(result_string);
  30. CImPdu pdu;
  31. pdu.SetPBMsg(&msg);
  32. pdu.SetServiceId(SID_LOGIN);
  33. pdu.SetCommandId(CID_LOGIN_RES_USERLOGIN);
  34. pdu.SetSeqNum(pPdu->GetSeqNum());
  35. SendPdu(&pdu);
  36. Close();
  37. return;
  38. }
  39. IM::Login::IMLoginReq msg;
  40. CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
  41. //假如是汉字,则转成拼音
  42. m_login_name = msg.user_name();
  43. string password = msg.password();
  44. uint32_t online_status = msg.online_status();
  45. if (online_status < IM::BaseDefine::USER_STATUS_ONLINE || online_status > IM::BaseDefine::USER_STATUS_LEAVE) {
  46. log("HandleLoginReq, online status wrong: %u ", online_status);
  47. online_status = IM::BaseDefine::USER_STATUS_ONLINE;
  48. }
  49. m_client_version = msg.client_version();
  50. m_client_type = msg.client_type();
  51. m_online_status = online_status;
  52. log("HandleLoginReq, user_name=%s, status=%u, client_type=%u, client=%s, ",
  53. m_login_name.c_str(), online_status, m_client_type, m_client_version.c_str());
  54. CImUser* pImUser = CImUserManager::GetInstance()->GetImUserByLoginName(GetLoginName());
  55. if (!pImUser) {
  56. pImUser = new CImUser(GetLoginName());
  57. CImUserManager::GetInstance()->AddImUserByLoginName(GetLoginName(), pImUser);
  58. }
  59. pImUser->AddUnValidateMsgConn(this);
  60. CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);
  61. // continue to validate if the user is OK
  62. IM::Server::IMValidateReq msg2;
  63. msg2.set_user_name(msg.user_name());
  64. msg2.set_password(password);
  65. msg2.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());
  66. CImPdu pdu;
  67. pdu.SetPBMsg(&msg2);
  68. pdu.SetServiceId(SID_OTHER);
  69. pdu.SetCommandId(CID_OTHER_VALIDATE_REQ);
  70. pdu.SetSeqNum(pPdu->GetSeqNum());
  71. pDbConn->SendPdu(&pdu);
  72. }

0 人点赞