SRS是一个用C 开发的开源流媒体集群服务, 能够提供直播点播的功能. github链接: https://github.com/ossrs/srs, 官方架构图如下(3.0版本):
在src/main/srs_main_server.cpp中可以找到入口函数:
代码语言:javascript复制int main(int argc, char** argv) {
srs_error_t err = do_main(argc, argv);
...
}
然后是配置适配(兼容旧的配置):
代码语言:javascript复制// 解析参数, 读取配置文件, 将旧版本的配置转为新的配置
if ((err = _srs_config->parse_options(argc, argv)) != srs_success) {
return srs_error_wrap(err, "config parse options");
}
初始化日志:
代码语言:javascript复制srs_error_t SrsFastLog::initialize()
{
if (_srs_config) {
_srs_config->subscribe(this);
...
}
这里有一个配置变动订阅操作_srs_config->subscribe, 该函数接收基类为ISrsReloadHandler的对象, 在配置有变动时, 会遍历所有订阅了的对象, 调用对应的实现了的方法, 比如日志实现了on_reload_log_level方法, 在日志配置有变动时会调用.
然后是注册部分接口(版本信息/RTMP转FLV/静态页入口(如配置)/FLV/MP4拉流入口):
代码语言:javascript复制// 注册接口
if ((err = http_server->initialize()) != srs_success) {
return srs_error_wrap(err, "http server initialize");
}
用C后台常用的fork/kill后,到了主逻辑run_master:
代码语言:javascript复制srs_error_t run_master(SrsServer* svr)
{
srs_error_t err = srs_success;
// 初始化协程功能
if ((err = svr->initialize_st()) != srs_success) {
return srs_error_wrap(err, "initialize st");
}
// 初始化信号相关配置(创建信号管道)
if ((err = svr->initialize_signal()) != srs_success) {
return srs_error_wrap(err, "initialize signal");
}
// 获取句柄文件锁(防重复起进程)
if ((err = svr->acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "acquire pid file");
}
// 接口监听及处理
if ((err = svr->listen()) != srs_success) {
return srs_error_wrap(err, "listen");
}
// 注册信号
if ((err = svr->register_signal()) != srs_success) {
return srs_error_wrap(err, "register signal");
}
// 注册Http相关回调
if ((err = svr->http_handle()) != srs_success) {
return srs_error_wrap(err, "http handle");
}
// 注册MPEG提取(如需要)
if ((err = svr->ingest()) != srs_success) {
return srs_error_wrap(err, "ingest");
}
// loop: 判断信号/心跳/更新状态统计等
if ((err = svr->cycle()) != srs_success) {
return srs_error_wrap(err, "main cycle");
}
return err;
}
SRS是单进程的运行方式,使用协程来处理并发请求, 主逻辑在svr->listen里边:
代码语言:javascript复制srs_error_t SrsServer::listen()
{
srs_error_t err = srs_success;
// 监听RTMP
if ((err = listen_rtmp()) != srs_success) {
return srs_error_wrap(err, "rtmp listen");
}
// 监听Http的API
if ((err = listen_http_api()) != srs_success) {
return srs_error_wrap(err, "http api listen");
}
// 监听Http的拉流请求(HDS/HLS/DASH)
if ((err = listen_http_stream()) != srs_success) {
return srs_error_wrap(err, "http stream listen");
}
// 监听MPGE-TS/RTSP/FLV推流请求
if ((err = listen_stream_caster()) != srs_success) {
return srs_error_wrap(err, "stream caster listen");
}
// 定期清理待释放的连接
if ((err = conn_manager->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
}
return err;
}
这里边会分成四块来做监听:
1.rtmp: 推流拉流的主逻辑
2.http api: 查询/重载类请求
3.http stream: 自适性串流拉流逻辑
4.stream caster: 非rtmp格式推流逻辑
具体的逻辑如下:
主要看rtmp这块的, 层层跟进, 可以到SrsTcpListener:listen这个函数:
代码语言:javascript复制srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
// 监听端口
if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
}
srs_freep(trd);
// 注册协程, 以下是SRS里边通用的框架处理逻辑
trd = new SrsSTCoroutine("tcp", this);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
}
return err;
}
这是SRS里边很常见的模式, 用SrsSTCoroutine来注册协程, 然后start, start内部会判断是否重复调用,然后又会回调到本身的cycle函数, 所以只需要看cycle的实现即可:
代码语言:javascript复制srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
...
if ((err = handler->on_tcp_client(fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
}
return err;
}
cycle也是很模块化的一种实现, 都是在while循环里边, 用pull函数来判断是否出现错误了, 然后执行收包处理逻辑. 部分类的实现可能会调用do_cycle, 逻辑也和上述类似.
继续往下, 从SrsBufferListener:on_tcp_client到SrsServer:accept_client再到SrsConnection:start, 和上边一样, 直接看SrsConnection:cycle, 再到SrsRtmpConn:do_cycle进入rtmp的主逻辑:
代码语言:javascript复制srs_error_t SrsRtmpConn::do_cycle()
{
...
// 握手
if ((err = rtmp->handshake()) != srs_success) {
return srs_error_wrap(err, "rtmp handshake");
}
...
// 读取请求包, 解析内容
SrsRequest* req = info->req;
if ((err = rtmp->connect_app(req)) != srs_success) {
return srs_error_wrap(err, "rtmp connect tcUrl");
}
...
// 拉收推流或拉流请求
if ((err = service_cycle()) != srs_success) {
err = srs_error_wrap(err, "service cycle");
}
...
}
先是握手(接收C0C1, 回S0S1S2, 接收C2), 设置窗口/带宽信息, 然后进入SrsRtmpConn:stream_service_cycle, 在做完请求包解析/权限校验后, 进入类型判断:
代码语言:javascript复制switch (info->type) {
case SrsRtmpConnPlay: {
// response connection start play
// 按协议依次发送相应的包给对端
if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start play");
}
// 调用开始播放的Hook
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
// 播放
err = playing(source);
// 调用播放结束的Hook
http_hooks_on_stop();
return err;
}
case SrsRtmpConnFMLEPublish: {
// 按标准的流程进行推流前的交互
if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FMLE publish");
}
// 接收推流
return publishing(source);
}
case SrsRtmpConnHaivisionPublish: {
if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
}
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start FLASH publish");
}
return publishing(source);
}
default: {
return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
}
}
这里有三种推流和一种拉流, 这里先看推流:
1.start_xxx_publish是按协议格式做接收推流前的交互, FMLE/Haivision/Flash的都不一样
2.publishing则是实际的接收推流的逻辑
代码语言:javascript复制// 执行配置的Hook
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on publish");
}
// 接收推流前的准备逻辑, 还比较多, 比如创建目录文件/挂载到Http服务里/准备FFMPEG解码等
if ((err = acquire_publish(source)) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
// 接收推流的主要逻辑
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);
rtrd.stop();
}
acquire_publish里边会执行流挂载逻辑, 以便http stream请求能够访问到, do_publish是实际接收推流的逻辑, 可以跟进到SrsRtmpConn:process_publish_message里边:
代码语言:javascript复制srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
...
// process audio packet
if (msg->header.is_audio()) {
if ((err = source->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume audio");
}
return err;
}
// process video packet
if (msg->header.is_video()) {
if ((err = source->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume video");
}
return err;
}
根据音频或视频分别处理, 处理逻辑类似, 这里看视频的处理on_video:
代码语言:javascript复制if (!mix_correct) {
return on_video_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy());
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
err = on_audio_imp(m);
} else {
err = on_video_imp(m);
}
可以看到, 如果没有配置mix_correct, 则直接处理分发了, 如果配置了的话, 则会扔到队列里, 然后再取出时间最早的处理.
代码语言:javascript复制// pure video
if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) {
mix_ok = true;
}
// pure audio
if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) {
mix_ok = true;
}
// got 1 video and 1 audio, mix ok.
if (nb_videos >= 1 && nb_audios >= 1) {
mix_ok = true;
}
if (!mix_ok) {
return NULL;
}
这个队列只有纯视频或纯音频或同时有视频和音频时,才能取到数据, 取出时由于map的特性, 会按时间戳排序取出. 最后就是on_video_imp:
代码语言:javascript复制// Copy to hub to all utilities.
// 与音频不同, 这里是hub先处理(1. 解包, 2. 写hls/dash/dvr)
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
return srs_error_wrap(err, "hub consume video");
}
// copy to all consumer
// 塞到所有消费者队列里
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i ) {
SrsConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
}
}
主要是遍历这个流下的所有消费者, 把流数据放到对应的队列里, 消费者就能看到直播了. 至于消费者列表如何来的, 前边的图也有列出:
1.通过rtmp的拉流接口来访问
2.通过http stream接口来访问
接下来看拉流的分支:
1.start_play是按RTMP协议的向请求方做确认拉流前的交互
2.http_hooks_on_play根据配置调用相应的开始时的hook
3.playing是拉流的主逻辑
4.http_hooks_on_stop根据配置调用相应的结束时的hook
playing的主逻辑如下:
代码语言:javascript复制// 为这个源创建一个消费者, 后续会遍历源下的消费者,进行推送
if ((err = source->create_consumer(this, consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
// 单独建一个接收队列来接收对端的控制请求
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}
// Deliver packets to peer.
wakable = consumer;
err = do_playing(source, consumer, &trd);
首先往流里边挂个新的消费者, 然后单独创建队列来接收控制类消息(如暂停/开启/结束, 说性能能够提升33%), 主协程主要是不停地从队列里取控制消息或者数据包, 做相应的处理或发送给请求方:
代码语言:javascript复制while (true) {
...
// 处理控制命令请求, 如关闭/暂停
while (!rtrd->empty()) {
SrsCommonMessage* msg = rtrd->pump();
if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: play control message");
}
}
...
// 从队列里边将数据包取出
int count = (send_min_interval > 0)? 1 : 0;
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
return srs_error_wrap(err, "rtmp: consumer dump packets");
}
...
// 发送数据包给对端
if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: send %d messages", count);
}
至此, RTMP的推流和拉流流程, 已经都完成了.
其余几个入口的功能:
1. Http Api监听的是查询类的请求, 也有console下的文件拉取请求
2. Http Stream监听的是自适应串流的请求, 主要是在流下添加消费者来实现.
3. Http Caster监听的是非RTMP的推流请求, 通过格式转换后, 发送给自身的RTMP监听端口来实现.