做转码服务的原型时,看了看MCU的实现,考虑到如果不做转码,可以将多路rtp流直接合成为一路rtmp流输出,这样就相当于实现了多人连麦,并将多人连麦的视频转发直播了,所以做了这个简单的原型实现!
DEMO只实现了接收一路rtp流,输出一路rtmp流!
同转码服务的类图设计:
基础库是ZLMediaKit,确实很方便!
直接上代码:
TranscoderTaskManager.h
代码语言:javascript复制可以使用linux的nc 127.0.0.1 3500 进行客户端测试!
然后使用ffmpeg对接收到的端口进行rtp包的推流:
ffmpeg -re -i tuiliu_mp4.mp4 -vcodec libx264 -b:v 600k -s 480x320 -profile baseline -maxrate 600k -minrate 600k -g 20 -keyint_min 20 -sc_threshold 0 -an -f rtp rtp://11.12.112.42:52458
在ZLMediaKit的ZLMediaKitservermain.cpp中启动TCP 3500端口的监听:
//启动转码服务
TranscoderTaskManager::getInstance().startTranscoderServer();
此结构体用来接收命令
/*
{“dest_ip”:11.12.112.10,
“dest_port”:9000,
“socket_protocol”:”udp”,
“transport_protocol”:”rtp”,
“source_width”:1080,
“source_height”:1920,
"source_sps":"";
"source_pps":"";
“source_samplerate”:2000,//kbps
“source_video_payloadtype”:”rtp”,
“source_video_codec”:”h264”,
“source_audio_codec”:”aac”,
“dest_video_codec”:”h264”,
“dest_audio_codec”:”aac”,
“dest_width”:640,
“dest_height”:480,
“dest_samplerate”:800 }
*/
class InputTaskInfo : public std::enable_shared_from_this<InputTaskInfo> {
friend class TranscoderTaskManager;
friend class TranscoderSession;
public:
typedef std::shared_ptr<InputTaskInfo> Ptr;
protected:
string dest_ip;
string transactcode;
string protocol;
int dest_port;//for tcp
int dest_audio_port;
int dest_video_port;
int socket_protocol;//0:udp, 1:tcp
string transport_protocol;
int source_width;
int source_height;
int source_samplerate;
string source_video_payloadtype;
string source_audio_payloadtype;
string source_video_codec;
string source_audio_codec;
string source_sps;
string source_pps;
string dest_video_codec;
string dest_audio_codec;
int dest_width;
int dest_height;
int dest_samplerate;
bool needTranscode;
bool outputUseRTMP;
bool outputNoAudio;
bool bSrtp;
string output_rtmp_live_name;
int proxy_recv_audio_port;
int proxy_recv_video_port;
RcvUDPDataTask::Ptr rcvVideoUDPTask;
RcvUDPDataTask::Ptr rcvAudioUDPTask;
Timer::Ptr _muteAudiotimer;
unsigned long lastTimeStamp;
unsigned long lastVideoTimeStamp;
unsigned long lastAudioTimeStamp;
Timer::Ptr _timer;
MuteAudioMaker::Ptr _audioMaker;
MultiMediaSourceMuxer::Ptr _mediaMuxer = NULL;
std::shared_ptr<FrameMerger> _merger;
AudioTrack::Ptr _audioTrack = NULL;
VideoTrack::Ptr _videoTrack = NULL;
void *_rtp_decoder = nullptr;
BufferRaw::Ptr _buffer;
};
class TranscoderTaskManager : public std::enable_shared_from_this<TranscoderTaskManager>
{
public:
typedef std::shared_ptr<TranscoderTaskManager> Ptr;
static TranscoderTaskManager& getInstance() {
static TranscoderTaskManager taskManager;
return taskManager;
}
void startTranscoderServer();
void addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo);
void removeTask(const string &transcode) {
lock_guard<mutex> lck(_mtxTranscodeClient);
_userTranscoderClientInfoMap.erase(transcode);
}
InputTaskInfo::Ptr getTask(string &transcode);
void removeTask(string &transcode);
protected:
TranscoderTaskManager();
~TranscoderTaskManager();
private:
TcpServer::Ptr _transcoderSrv;
unordered_map<string, InputTaskInfo::Ptr> _userTranscoderClientInfoMap;
mutex _mtxTranscodeClient;
};
////////////TRANSCODER 配置///////////
namespace Transcoder {
#define TRANSCODER_FIELD "transcoder."
const string kPort = TRANSCODER_FIELD"port";
onceToken token1([]() {
mINI::Instance()[kPort] = 3500;
}, nullptr);
} //namespace Shell
TranscoderTaskManager::TranscoderTaskManager():_transcoderSrv(new TcpServer())
{
}
TranscoderTaskManager::~TranscoderTaskManager()
{
}
void TranscoderTaskManager::startTranscoderServer() {
uint16_t transcoderPort = mINI::Instance()[Transcoder::kPort];
_transcoderSrv->start<TranscoderSession>(transcoderPort);
}
void TranscoderTaskManager::addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo) {
//创建转码对象TranscoderTask
//创建接收socket
//开始监听接收任务和转码任务
lock_guard<mutex> lck(_mtxTranscodeClient);
_userTranscoderClientInfoMap[transcode] = inputInfo;
}
InputTaskInfo::Ptr TranscoderTaskManager::getTask(string &transcode) {
if (_userTranscoderClientInfoMap.find(transcode) != _userTranscoderClientInfoMap.end()) {
return _userTranscoderClientInfoMap[transcode];
}
return NULL;
}
void TranscoderTaskManager::removeTask(string &transcode) {
_userTranscoderClientInfoMap->erase(transcode);
}
TranscoderSession.h
代码语言:javascript复制 class TranscoderSession :
public TcpSession
{
public:
TranscoderSession(const Socket::Ptr &pSock);
virtual ~TranscoderSession();
////TcpSession override////
void onRecv(const Buffer::Ptr &pBuf) override;
void onError(const SockException &err) override;
void onManager() override;
private:
string _transcoder;
string _strRecvBuf;
Ticker _beatTicker;
string _strUserName;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
};
TranscoderSession.cpp
代码语言:javascript复制/** 常量定义 **/
#define START_TRANSCODE_CMD "1001"
#define STOP_TRANSCODE_CMD "1002"
#define START_PROXY_CMD "2001"
#define STOP_PROXY_CMD "2002"
/** 函数 **/
TranscoderSession::TranscoderSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugP(this);
//send("hello.");
}
void TranscoderSession::onRecv(const Buffer::Ptr&buf) {
//DebugL << hexdump(buf->data(), buf->size());
_beatTicker.resetTime();
//所有3500的输入消息会回调到这个方法:
//使用json解析出命令START_PROXY_CMD, 然后启动一个UDP的接收任务:
_strRecvBuf.append(buf->data(), buf->size());
Json::Reader reader;
Json::Value root;
if (reader.parse(strValue, root))
{
//..此处省略解析json字符串的代码
if (value.compare(START_PROXY_CMD) == 0) {
const weak_ptr<TcpSession> weakSelf = shared_from_this();
auto &weak1 = inputInfo;
if (inputInfo->_mediaMuxer == NULL) {
//使用rtmp://127.0.0.1/live/chn_00 点播就可以了
inputInfo->_mediaMuxer.reset(new MultiMediaSourceMuxer(DEFAULT_VHOST, "live", "chn_00", 0, true, true, false, false));
}
inputInfo->rcvVideoUDPTask = make_shared<RcvUDPDataTask>();
inputInfo->proxy_recv_video_port = inputInfo->rcvVideoUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) {
uint8_t * data = (uint8_t *)buf->data();
uint8_t rtp_type = 0x7F & data[1];
uint8_t rtp_mark = 0x1 & data[2];
uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7];
auto frame = std::make_shared<H264FrameNoCacheAble>((char *)(buf->data() 12), buf->size() - 12, timestamp, timestamp, 0);
//这里就是把收到的rtp流转发给mediamuxer,用于混合成rtmp流
weak1->_videoTrack->inputFrame(frame);
});
inputInfo->_videoTrack = std::make_shared<H264Track>();
//添加视频
inputInfo->_mediaMuxer->addTrack(inputInfo->_videoTrack);
//视频数据写入_mediaMuxer
inputInfo->_videoTrack->addDelegate(inputInfo->_mediaMuxer);
//用来合并rtp包
inputInfo->_merger = std::make_shared<FrameMerger>();
inputInfo->rcvAudioUDPTask = make_shared<RcvUDPDataTask>();
inputInfo->proxy_recv_audio_port = inputInfo->rcvAudioUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) {
//
uint8_t * data = (uint8_t *)buf->data();
uint8_t rtp_type = 0x7F & data[1];
uint8_t rtp_mark = 0x1 & data[2];
uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7];
auto frame = std::make_shared<AACFrameNoCacheAble>((char *)(buf->data() 12), buf->size() - 12, timestamp, timestamp);
weak1->_audioTrack->inputFrame(frame);
weak1->_timer.reset();
}
);
inputInfo->_audioTrack = std::make_shared<AACTrack>();
//添加音频
inputInfo->_mediaMuxer->addTrack(inputInfo->_audioTrack);
inputInfo->_audioTrack->addDelegate(inputInfo->_mediaMuxer);
retJson["proxy_recv_video_port"] = inputInfo->proxy_recv_video_port;
retJson["proxy_recv_audio_port"] = inputInfo->proxy_recv_audio_port;
TranscoderTaskManager::getInstance().addTask(inputInfo->transactcode, inputInfo);
//将接收video和audio的端口返回给客户端
std::string out = retJson.toStyledString();
send(out);
}
}
}
TranscoderSession::~TranscoderSession()
{
DebugP(this);
TranscoderTaskManager::getInstance().removeTask(_transactcode);
}
void TranscoderSession::onError(const SockException &err) {
WarnP(this) << err.what();
}
void TranscoderSession::onManager() {
//session 超时管理
}
代码语言:javascript复制class RcvUDPDataTask : public std::enable_shared_from_this<RcvUDPDataTask>
{
public:
//接收数据回调
typedef function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)> onReadCB;
enum MediaType {VIDEO = 0, AUDIO};
typedef std::shared_ptr<RcvUDPDataTask> Ptr;
public:
RcvUDPDataTask();
virtual ~RcvUDPDataTask();
int startListener(string peerAddr, int peerPort);
int startListener(onReadCB cb);
int stopListener();
private:
//RTP端口,trackid idx 为数组下标
Socket::Ptr _rcvSock;
Socket::Ptr _sendSock;
uint64_t _ui64TotalBytes = 0;
MediaType _mediaType;
};
RcvUDPDataTask::RcvUDPDataTask()
{
_sendSock.reset(new Socket(nullptr, false));
_rcvSock.reset(new Socket(nullptr, false));
}
int RcvUDPDataTask::stopListener()
{
_rcvSock->closeSock();
return 0;
}
int RcvUDPDataTask::startListener(onReadCB cb) {
//设置接收socket
onceToken token(nullptr, [&]() {
SockUtil::setRecvBuf(_rcvSock->rawFD(), 4 * 1024 * 1024);
//SockUtil::setSendBuf(_sendSock->rawFD(), 4 * 1024 * 1024);
});
//所有收到的包直接回调到cb方法
_rcvSock->setOnRead(cb);
_rcvSock->setOnErr([this](const SockException &err) { });
if (!_rcvSock->bindUdpSock(0, "0.0.0.0")) {
return -1;
}
return _rcvSock->get_local_port();
}