作者:陈伟 导语: 关于SPP的解读已经很多,本文尝试从另外的角度解读SRF&SPP的源码。本文所涉及SRF代码皆以3.1.8版本,SPP代码皆以3.0.1版本为准。
一个请求包的处理流程
从SRF生成的Server.cpp中我们可以看到,框架帮我们实现了SPP的5个入口函数。
- spp_handle_init
- spp_handle_input
- spp_handle_route
- spp_handle_process
- spp_handle_fini
接下来深入到源码看看框架都帮我们做了什么事情。
代码语言:javascript复制extern "C" int spp_handle_init(void* arg1, void* arg2)
{
const char * etc = (const char*)arg1;
CServerBase* base = (CServerBase*)arg2;
//配置文件
ServerConfig::ConfigFile = string(etc);
//配置路径
ServerConfig::ConfigPath = TC_File::extractFilePath(etc);
//应用名
ServerConfig::Application = SRF_APP;
//服务名
ServerConfig::ServerName = SRF_SERVER_NAME;
base->log_.LOG_P_PID(LOG_DEBUG,
"spp_handle_init, config:%s, servertype:%dn",
etc, base->servertype());
if (base->servertype() == SERVER_TYPE_WORKER)
{
/* 初始化框架 */
int iRet = CSyncFrame::Instance()->InitFrame(base, 100000);
if (iRet < 0)
{
base->log_.LOG_P_PID(LOG_FATAL,
"Sync framework init failed, ret:%dn",
iRet);
return -1;
}
// SRF初始化
SRFMsg::SrfInitialize(ServerConfig::ConfigFile, base);
/* 业务自身初始化 */
SRFMsg::addServant<PointPushCenterImp>(ServerConfig::Application "."
ServerConfig::ServerName
".PointPushCenterObj");
}
return 0;
}
在SPP的初始化函数中,我们只是看到对一系列的变量赋值和一些初始化函数的调用。proxy和worker进程都会调用这个初始化函数,其中worker进程的调用做了些特殊处理。接下来我们主要看下worker进程所做的一系列初始化调用。
框架初始化
CSyncFrame::Instance()->InitFrame(base, 100000); 我们可以从SPP源码中看到这个方法的实现,这其中比较重要的是下面几条语句:
代码语言:javascript复制CAsyncFrame::Instance()->InitFrame2(_pServBase); //初始化框架需要用的管理资源
bool rc = MtFrame::Instance()->InitFrame(&s_log, max_thread_num); //初始化微线程库
_iNtfyFd = SppShmNotify(_iGroupId*2); //通过groupid侦听命名管道
MtFrame::sleep(0); //微线程主动让出执行权
这里每个函数又都做了一些更底层的操作,有兴趣的同学可以去追踪一下代码看实际都做了些什么事情。
数据包完整性检查
spp_handle_input函数主要由proxy调用来检查数据包完整性,SRF框架主要做的就是对SRFMsg结构体进行检查。其中主要的代码只是一句:
代码语言:javascript复制int ret = SRFMsg::input(blob->data, blob->len);
这个方法由SRF框架实现,我们来看看做了什么事情。
代码语言:javascript复制int SRFMsg::input(const char *data, size_t len)
{
return _proto->input(data, len);
}
这个方法主要调用了StrProtocol这个类的input方法:
代码语言:javascript复制virtual int input(const char *data, size_t len, int &packlen)
{
if(!isStrProtocol(data, len))
{
return JceProtocol::input(data, len, packlen);
}
char *pos = (char *)memchr(data, 'n', len);
if(pos == NULL)
{
return PACKET_LESS;
}
packlen = pos - data 1;
return PACKET_FULL;
}
这个方法中如果不是字符串协议直接交给了JceProtocol这个类来检查;如果是字符串协议则直接检查n字符。
JceProtocol类中:
代码语言:javascript复制virtual int input(const char *data, size_t len, int &packlen)
{
if (len < sizeof(uint32_t))
{
return PACKET_LESS;
}
// 数据包长
uint32_t iHeaderLen = 0;
::memcpy(&iHeaderLen, data, sizeof(uint32_t));
iHeaderLen = ntohl(iHeaderLen);
// 限制包长
if (iHeaderLen < sizeof(uint32_t) ||
iHeaderLen > Message::MAX_SRF_REQ_BUF)
{
return PACKET_ERR;
}
if (len < iHeaderLen)
{
return PACKET_LESS;
}
packlen = static_cast<int>(iHeaderLen);
return PACKET_FULL;
}
其中也只是对数据包的长度进行了一些基本的检查而已。
数据包路由
spp_handle_route方法实现了数据包路由,其实现在SRF中非常简单:
代码语言:javascript复制extern "C" int spp_handle_route(unsigned flow, void* arg1, void* arg2)
{
return 1;
}
这里的返回是我们建的worker组id,其实也就是proxy与worker通信所使用的命名管道的key。一般在SRF服务中我们也都只建一个worker组,其id为1。
数据包处理
spp_handle_process方法是worker进程所调用的消息处理函数:
代码语言:javascript复制extern "C" int spp_handle_process(unsigned flow, void* arg1, void* arg2)
{
blob_type * blob = (blob_type*)arg1;
TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
CServerBase* base = (CServerBase*)arg2;
CTCommu * commu = (CTCommu*)blob->owner;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_process, %d, %d, %s, %sn",
flow,
blob->len,
inet_ntoa(*(struct in_addr*)&extinfo->remoteip_),
format_time(extinfo->recvtime_));
// SeverConfigure
if (ServerConfig::LocalIp.empty())
ServerConfig::LocalIp = inet_ntoa(*(struct in_addr*)&extinfo->localip_);
/* 简单的单发单收模型示例 */
SRFMsg *msg = new SRFMsg;
if (!msg) {
blob_type respblob;
respblob.data = NULL;
respblob.len = 0;
commu->sendto(flow, &respblob, NULL);
base->log_.LOG_P_PID(LOG_ERROR, "close conn, flow:%un", flow);
return -1;
}
/* 设置msg信息 */
msg->SetServerBase(base);
msg->SetTCommu(commu);
msg->SetFlow(flow);
// TODO: 配置化
msg->SetMsgTimeout(3000);
// 设置来源地址
struct sockaddr_in from_addr;
from_addr.sin_addr.s_addr = extinfo->remoteip_;
from_addr.sin_port = extinfo->remoteport_;
msg->SetFromAddr(from_addr);
// 设置本地地址
struct sockaddr_in local_addr;
local_addr.sin_addr.s_addr = extinfo->localip_;
local_addr.sin_port = extinfo->localport_;
msg->SetLocalAddr(local_addr);
// 设置收包时间
struct timeval time_rcv;
time_rcv.tv_sec = extinfo->recvtime_;
time_rcv.tv_usec = extinfo->tv_usec;
msg->SetRcvTimestamp(time_rcv);
// 微线程有独立空间,这里要拷贝一次报文
msg->SetReqPkg(blob->data, blob->len);
CSyncFrame::Instance()->Process(msg);
return 0;
}
其中最主要的一句就只是最后的那个Process调用,这个方法的实现在SPP源码中:
代码语言:javascript复制int CSyncFrame::Process(CSyncMsg *pMsg)
{
if (MtFrame::CreateThread(ThreadEntryFunc, pMsg) == NULL)
{
MT_ATTR_API(320837, 1); // 创建失败
SF_LOG(LOG_ERROR, "Sync frame start thread failed, error");
delete pMsg;
return -1;
}
return 0;
};
这个方法主要就是创建了一个微线程来处理一个CSyncMsg结构,其中SRFMsg是CSyncMsg结构的派生类。
这时候我们进微线程框架来看一看:
代码语言:javascript复制MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
{
MtFrame* mtframe = MtFrame::Instance();
MicroThread* thread = mtframe->AllocThread();
if (NULL == thread)
{
MTLOG_ERROR("create thread failed");
return NULL;
}
thread->SetSartFunc(entry, args);
if (runable) {
mtframe->InsertRunable(thread);
}
return thread;
}
MtFrame是一个全局共享的单例对象,这里只是申请了一个微线程资源去处理这个请求包,并以ThreadStart为入口函数去执行。这里并不一定是去新建一个微线程,我们去看下AllocThread的代码:
代码语言:javascript复制MicroThread* ThreadPool::AllocThread()
{
MT_ATTR_API_SET(492069, _total_num); // 微线程池大小
MicroThread* thread = NULL;
if (!_freelist.empty())
{
thread = _freelist.front();
_freelist.pop();
ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
thread->UnsetFlag(MicroThread::FREE_LIST);
_use_num ;
return thread;
}
MT_ATTR_API(320846, 1); // pool no nore
if (_total_num >= _max_num)
{
MT_ATTR_API(361140, 1); // no more quota
return NULL;
}
thread = new MicroThread();
if ((NULL == thread) || (false == thread->Initial()))
{
MT_ATTR_API(320847, 1); // pool init fail
MTLOG_ERROR("thread alloc failed, thread: %p", thread);
if (thread) delete thread;
return NULL;
}
_total_num ;
_use_num ;
return thread;
}
这里可以看到是尝试从_freelist中取一个微线程来使用,为空的话才去创建一个新的微线程。我们回到CreateThread函数,申请一个微线程资源后会把微线程放入可运行队列里面去调度执行。
接下来我们可以去看看ThreadEntryFunc函数的实现,其主要起作用的也只有一行代码:
代码语言:javascript复制rc = msg->HandleProcess();
这里执行的是SRFMsg类中的HandleProcess方法,这个方法的实现代码比较长这里就不贴了。主要的作用就是解包然后调用SrfServant类中的dispatch方法。当然还有一些支持其他调用方式的其他逻辑,这里就不再赘述了。这里的dispatch实现也很简单:
代码语言:javascript复制virtual int dispatch(SrfCurrentPtr current, vector<char> &buffer)
{
return onDispatch(current, buffer);
}
只是直接调用了onDispatch虚函数,框架生成代码中对这一方法进行了重写。该方法其实是对请求包所调用的接口进行分发。最终调用了我们在Imp类中所实现的逻辑代码。
业务终止
代码语言:javascript复制spp_handle_fini的代码很简单只是做一些对象的析构而已。
extern "C" void spp_handle_fini(void* arg1, void* arg2)
{
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P(LOG_DEBUG, "spp_handle_finin");
if ( base->servertype() == SERVER_TYPE_WORKER )
{
CSyncFrame::Instance()->Destroy();
}
}
至此一个请求包的处理流程我们已经理清楚了。
RPC调用流程
回想我们用一个SRF服务调用另一个SRF服务的流程。首先在Imp类里需要声明所要调服务的代理指针,然后在初始化函数里getChecked一下这个指针,之后就可以在逻辑代码里直接调用其它服务接口了。我们深入代码看一下这其中到底做了什么事情。
初始化
首先,我们看一下ProxyFactory类的getChecked方法:
代码语言:javascript复制template<typename T>
T& getChecked(const string &name, T &t, bool isCoroutine = false)
{
TC_LockT<TC_ThreadMutex> lock(*this);
typedef typename T::element_type TT;
/// 协程版本和非协程版本分开存储
map<int, map<string, SrfServantPrx> > &proxy = isCoroutine
? _co_proxy : _proxy;
int iTid = getTid();
map<int, map<string, SrfServantPrx> >::iterator itMapPrx = proxy.find(iTid);
if(itMapPrx == proxy.end() || itMapPrx->second.find(name) == itMapPrx->second.end())
{
t = new TT();
t->initialize(this, name, isCoroutine);
t->SetRecvBuffLen(_default_recv_buff_len);
t->SetSendBuffLen(_default_send_buff_len);
proxy[iTid][name] = t;
L_INFO("new proxy--->"<<name<<",isCoroutine:"<<isCoroutine<<endl);
}
t = (typename T::element_type*)((proxy[iTid][name]).get());
return t;
}
其参数前面是所要调用的服务名,第二个是服务代理指针,第三个是是否需要使用微线程标记。其作用其实是将相关调用信息保存在proxy中以供后面查询使用。
同步调用过程
我们在声明代理指针时,会先将对端的jce引入到自己的项目中生成其调用的代理类。我们所声明的代理对象中包含了所有对端服务所对外提供的接口,其实现主要都是调用了一个srf_invoke方法。其实现也很简单:
代码语言:javascript复制void SrfServantProxy::srf_invoke(char cPacketType,
const string& sFuncName,
const vector<char> &buf,
const map<string, string>& context,
const map<string, string>& status,
ResponsePacket& rsp)
{
assert(cPacketType == JCENORMAL);
#ifdef CORO_SRF
if (is_co_routine_)
{
co_srf_invoke(cPacketType, sFuncName, buf, context, status, rspbuf);
return ;
}
#endif
// 框架包请求编码
Message *msg = new Message(Message::SYNC_CALL);
msg->request.iVersion = JCEVERSION;
msg->request.cPacketType = cPacketType;
msg->request.sServantName = _sObjName;
msg->request.sFuncName = sFuncName;
msg->request.sBuffer = buf;
msg->request.context = context;
msg->request.status = status;
msg->iBeginTime = TNOWMS;
msg->request.status["AppName"] = ServerConfig::Application "."
ServerConfig::ServerName;
invoke_sync(msg);
rsp = msg->response;
delete msg;
}
可以看到这里主要调用了invoke_sync方法:
代码语言:javascript复制 void SrfServantProxy::invoke_sync(Message * msg)
{
ConnectionSync *pConn = selectConnSync();
if(pConn == NULL)
{
delete msg;
throw BaseException("failed to get active address connection for " _sObjName);
}
msg->connSync = pConn;
int iRet = _pProto->enRequest(msg->request, msg->pReqData, msg->iReqDatalen);
if(iRet != E_SUCC)
{
delete msg;
throw BaseException(_sObjName " encode failed:" TC_Common::tostr(iRet));
}
if(msg->eType != Message::SYNC_CALL)
{
delete msg;
throw BaseException(_sObjName " error SYNC_CALL:" TC_Common::tostr(iRet));
}
iRet = pConn->sendRecv(msg);
L_INFO("sendRecv return:" << iRet << endl);
if(msg->response.iRet != E_SUCC)
{
iRet = msg->response.iRet;
delete msg;
throw BaseException(pConn->toString() ":error=" etos((ErrorCode)iRet));
}
reportSync(msg);
}
其实现主要是从连接池中取出一个连接,然后组包发请求上报。其调用了封装过的sendRecv方法进行收发包。这是同步调用模式的RPC过程。
微线程调用过程
从上面srf_invoke方法可以看到如果是微线程版本则调用了co_srf_invoke,这个方法跟同步方法相比只是把收发包的接口换成了微线程的版本。使用了mt_tcpsendrcv和mt_udpsendrcv这两个方法。
微线程组件
前面两个章节已经涉及到了一些微线程的创建以及微线程的收发包接口,本节我们主要看一下微线程的实现代码。
我们先从涉及到的mt_tcpsendrcv方法实现看看:
代码语言:javascript复制int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
{
if (!dst || !pkg || !rcv_buf || !func)
{
MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p]",
dst, pkg, rcv_buf, func);
return -10;
}
int ret = 0, rc = 0;
int addr_len = sizeof(struct sockaddr_in);
utime64_t start_ms = MtFrame::Instance()->GetLastClock();
utime64_t cost_time = 0;
int time_left = timeout;
// 1. 获取TCP连接池对象,挂接通知对象
int sock = -1;
TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
if ((conn == NULL) || (sock < 0))
{
MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
ret = -1;
goto EXIT_LABEL;
}
// 2. 尝试检测或新建连接
rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
if (rc < 0)
{
MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
ret = -4;
goto EXIT_LABEL;
}
// 3. 发送数据处理
cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
rc = MtFrame::send(sock, pkg, len, 0, time_left);
if (rc < 0)
{
MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
ret = -2;
goto EXIT_LABEL;
}
// 4. 接收数据处理
cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
if (rc < 0)
{
MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
ret = rc;
goto EXIT_LABEL;
}
ret = 0;
EXIT_LABEL:
// 失败则强制释放连接,否则定时保活
if (conn != NULL)
{
ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
}
return ret;
}
这里主要是调用了MtFrame类中的send方法去发送数据,这个类是一个全局单例类。这里的send方法实现为:
代码语言:javascript复制ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
{
MtFrame* mtframe = MtFrame::Instance();
utime64_t start = mtframe->GetLastClock();
MicroThread* thread = mtframe->GetActiveThread();
utime64_t now = 0;
ssize_t n = 0;
size_t send_len = 0;
while (send_len < nbyte)
{
now = mtframe->GetLastClock();
if ((int)(now - start) > timeout)
{
errno = ETIME;
return -1;
}
mt_hook_syscall(send);
n = mt_real_func(send)(fd, (char*)buf send_len, nbyte - send_len, flags);
if (n < 0)
{
if (errno == EINTR) {
continue;
}
if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
MTLOG_ERROR("write failed, errno: %d", errno);
return -2;
}
}
else
{
send_len = n;
if (send_len >= nbyte) {
return nbyte;
}
}
EpollerObj epfd;
epfd.SetOsfd(fd);
epfd.EnableOutput();
epfd.SetOwnerThread(thread);
if (!mtframe->EpollSchedule(NULL, &epfd, timeout)) {
return -3;
}
}
return nbyte;
}
其中mt_hook_syscall是一个宏主要作用是利用dlsym获取动态链接库中的函数地址。其实现为:
代码语言:javascript复制#define mt_hook_syscall(name)
do {
if (!g_mt_syscall_tab.real_##name) {
g_mt_syscall_tab.real_##name = (func_##name)dlsym(RTLD_NEXT, #name);
}
} while (0)
微线程框架实现了系统原生的网络相关接口的hook版本,其hook版本也都是在进行网络IO的时候触发微线程切换来充分利用CPU进而提高吞吐量。
本文为国庆假期期间在家里闲暇时写的一点东西,有理解错误之处还望指正。