实现一个接收多路RTP流,输出一路RTMP流的简单MCU

2023-05-02 15:42:49 浏览数 (2)

做转码服务的原型时,看了看MCU的实现,考虑到如果不做转码,可以将多路rtp流直接合成为一路rtmp流输出,这样就相当于实现了多人连麦,并将多人连麦的视频转发直播了,所以做了这个简单的原型实现!

DEMO只实现了接收一路rtp流,输出一路rtmp流!

同转码服务的类图设计:

基础库是ZLMediaKit,确实很方便!

直接上代码:

TranscoderTaskManager.h

代码语言:javascript复制
可以使用linux的nc 127.0.0.1 3500 进行客户端测试!
然后使用ffmpeg对接收到的端口进行rtp包的推流:
 
ffmpeg -re -i tuiliu_mp4.mp4 -vcodec libx264 -b:v 600k -s 480x320 -profile baseline  -maxrate 600k -minrate 600k -g 20 -keyint_min 20  -sc_threshold 0 -an -f rtp rtp://11.12.112.42:52458

在ZLMediaKit的ZLMediaKitservermain.cpp中启动TCP 3500端口的监听:

//启动转码服务
TranscoderTaskManager::getInstance().startTranscoderServer();

此结构体用来接收命令
/*
{“dest_ip”:11.12.112.10,
“dest_port”:9000,
“socket_protocol”:”udp”,
“transport_protocol”:”rtp”,
“source_width”:1080,
“source_height”:1920,
"source_sps":"";
"source_pps":"";
“source_samplerate”:2000,//kbps
“source_video_payloadtype”:”rtp”,
“source_video_codec”:”h264”,
“source_audio_codec”:”aac”,
“dest_video_codec”:”h264”,
“dest_audio_codec”:”aac”,
“dest_width”:640,
“dest_height”:480,
“dest_samplerate”:800 }
*/
class InputTaskInfo : public std::enable_shared_from_this<InputTaskInfo> {
	friend class TranscoderTaskManager;
	friend class TranscoderSession;
public:
	typedef std::shared_ptr<InputTaskInfo> Ptr;
protected:
	string dest_ip;
	string transactcode;
	string protocol;
	int dest_port;//for tcp
	int dest_audio_port;
	int dest_video_port;
	int socket_protocol;//0:udp, 1:tcp
	string transport_protocol;
	int source_width;
	int source_height;
	int source_samplerate;
	string source_video_payloadtype;
	string source_audio_payloadtype;
	string source_video_codec;
	string source_audio_codec;
	string source_sps;
	string source_pps;

	string dest_video_codec;
	string dest_audio_codec;
	int dest_width;
	int dest_height;
	int dest_samplerate; 
	bool needTranscode;
	bool outputUseRTMP;
	bool outputNoAudio;
	bool bSrtp;
	string output_rtmp_live_name;
	int proxy_recv_audio_port;
	int proxy_recv_video_port;
	RcvUDPDataTask::Ptr rcvVideoUDPTask;
	RcvUDPDataTask::Ptr rcvAudioUDPTask;

	Timer::Ptr _muteAudiotimer;
	unsigned long lastTimeStamp;
	unsigned long lastVideoTimeStamp;
	unsigned long lastAudioTimeStamp;
	Timer::Ptr _timer;
	MuteAudioMaker::Ptr _audioMaker;
	MultiMediaSourceMuxer::Ptr _mediaMuxer = NULL;
	std::shared_ptr<FrameMerger> _merger;

	AudioTrack::Ptr _audioTrack = NULL;
	VideoTrack::Ptr _videoTrack = NULL;
	void *_rtp_decoder = nullptr;
	BufferRaw::Ptr _buffer;
};


class TranscoderTaskManager : public std::enable_shared_from_this<TranscoderTaskManager>
{
public:
	typedef std::shared_ptr<TranscoderTaskManager> Ptr;
	static TranscoderTaskManager& getInstance() {
		static TranscoderTaskManager taskManager;
		return taskManager;
	}

	void startTranscoderServer();


	void addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo);
	void removeTask(const string &transcode) {

		lock_guard<mutex> lck(_mtxTranscodeClient);
		_userTranscoderClientInfoMap.erase(transcode);
	}

	InputTaskInfo::Ptr getTask(string &transcode);
	void removeTask(string &transcode);
protected:
	TranscoderTaskManager();
	~TranscoderTaskManager();
	 
private:
	TcpServer::Ptr _transcoderSrv;
	unordered_map<string, InputTaskInfo::Ptr> _userTranscoderClientInfoMap;
	mutex _mtxTranscodeClient; 
};



////////////TRANSCODER 配置///////////
namespace Transcoder {
#define TRANSCODER_FIELD "transcoder."
	const string kPort = TRANSCODER_FIELD"port";
	onceToken token1([]() {
		mINI::Instance()[kPort] = 3500;
	}, nullptr);
} //namespace Shell

TranscoderTaskManager::TranscoderTaskManager():_transcoderSrv(new TcpServer())
{ 
}


TranscoderTaskManager::~TranscoderTaskManager()
{
}

void TranscoderTaskManager::startTranscoderServer() {

	uint16_t transcoderPort = mINI::Instance()[Transcoder::kPort];
	_transcoderSrv->start<TranscoderSession>(transcoderPort);
}

void TranscoderTaskManager::addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo) {
	//创建转码对象TranscoderTask

	//创建接收socket

	//开始监听接收任务和转码任务
	lock_guard<mutex> lck(_mtxTranscodeClient);
	_userTranscoderClientInfoMap[transcode] = inputInfo;

}

InputTaskInfo::Ptr TranscoderTaskManager::getTask(string &transcode) {
	if (_userTranscoderClientInfoMap.find(transcode) != _userTranscoderClientInfoMap.end()) {
		return _userTranscoderClientInfoMap[transcode];
	}
	return NULL;
}

void TranscoderTaskManager::removeTask(string &transcode) {

	_userTranscoderClientInfoMap->erase(transcode);
}

TranscoderSession.h

代码语言:javascript复制
	class TranscoderSession :
		public TcpSession
	{
	public:
		TranscoderSession(const Socket::Ptr &pSock);
		virtual ~TranscoderSession();

		////TcpSession override////
		void onRecv(const Buffer::Ptr &pBuf) override;
		void onError(const SockException &err) override;
		void onManager() override;

	private:
		string _transcoder;
		string _strRecvBuf;
		Ticker _beatTicker;
		string _strUserName;
		//消耗的总流量
		uint64_t _ui64TotalBytes = 0;
	};

TranscoderSession.cpp

代码语言:javascript复制
/** 常量定义 **/
#define START_TRANSCODE_CMD "1001"
#define STOP_TRANSCODE_CMD "1002"

#define START_PROXY_CMD "2001"
#define STOP_PROXY_CMD "2002"

/** 函数 **/
	TranscoderSession::TranscoderSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
		DebugP(this);
		//send("hello.");
	}

	void TranscoderSession::onRecv(const Buffer::Ptr&buf) {
		//DebugL << hexdump(buf->data(), buf->size());  
		_beatTicker.resetTime();
		
		//所有3500的输入消息会回调到这个方法:
		//使用json解析出命令START_PROXY_CMD, 然后启动一个UDP的接收任务:
		_strRecvBuf.append(buf->data(), buf->size());
		
		Json::Reader reader;
		Json::Value root;
		if (reader.parse(strValue, root))
		{
		        //..此处省略解析json字符串的代码
		        
			if (value.compare(START_PROXY_CMD) == 0) {
				const weak_ptr<TcpSession> weakSelf = shared_from_this();
				auto &weak1 = inputInfo;
				if (inputInfo->_mediaMuxer == NULL) {
				    //使用rtmp://127.0.0.1/live/chn_00 点播就可以了
			        inputInfo->_mediaMuxer.reset(new MultiMediaSourceMuxer(DEFAULT_VHOST, "live", "chn_00", 0, true, true, false, false));
				} 
				inputInfo->rcvVideoUDPTask = make_shared<RcvUDPDataTask>();

				inputInfo->proxy_recv_video_port = inputInfo->rcvVideoUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) {
			            
				    uint8_t * data = (uint8_t *)buf->data();
				    uint8_t rtp_type = 0x7F & data[1];
				    uint8_t rtp_mark = 0x1 & data[2];
				    uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7];
			        auto frame = std::make_shared<H264FrameNoCacheAble>((char *)(buf->data()   12), buf->size() - 12, timestamp, timestamp, 0);
			        //这里就是把收到的rtp流转发给mediamuxer,用于混合成rtmp流
			        weak1->_videoTrack->inputFrame(frame);	
				});
				
				inputInfo->_videoTrack = std::make_shared<H264Track>();
				//添加视频
				inputInfo->_mediaMuxer->addTrack(inputInfo->_videoTrack);
				//视频数据写入_mediaMuxer
				inputInfo->_videoTrack->addDelegate(inputInfo->_mediaMuxer);
				//用来合并rtp包
				inputInfo->_merger = std::make_shared<FrameMerger>();
				
				
				inputInfo->rcvAudioUDPTask = make_shared<RcvUDPDataTask>();

				inputInfo->proxy_recv_audio_port = inputInfo->rcvAudioUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) {
					//  
					uint8_t * data = (uint8_t *)buf->data();
					uint8_t rtp_type = 0x7F & data[1];
					uint8_t rtp_mark = 0x1 & data[2];
					uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7];
					auto frame = std::make_shared<AACFrameNoCacheAble>((char *)(buf->data()   12), buf->size() - 12, timestamp, timestamp); 
					weak1->_audioTrack->inputFrame(frame);
					weak1->_timer.reset();
				}
				);

				inputInfo->_audioTrack = std::make_shared<AACTrack>();
				//添加音频
				inputInfo->_mediaMuxer->addTrack(inputInfo->_audioTrack);
				inputInfo->_audioTrack->addDelegate(inputInfo->_mediaMuxer); 
				
				retJson["proxy_recv_video_port"] = inputInfo->proxy_recv_video_port;
				retJson["proxy_recv_audio_port"] = inputInfo->proxy_recv_audio_port;
				
				TranscoderTaskManager::getInstance().addTask(inputInfo->transactcode, inputInfo);
				//将接收video和audio的端口返回给客户端				
				std::string out = retJson.toStyledString();
				send(out);
			}
					
		}
			
	}	
	
	TranscoderSession::~TranscoderSession()
	{
		DebugP(this);
		TranscoderTaskManager::getInstance().removeTask(_transactcode);
	}

	void TranscoderSession::onError(const SockException &err) {
		WarnP(this) << err.what();
	}

	void TranscoderSession::onManager() {
		//session 超时管理

	}
代码语言:javascript复制
class RcvUDPDataTask : public std::enable_shared_from_this<RcvUDPDataTask>
{
public:
	//接收数据回调
	typedef function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)> onReadCB;
	enum MediaType {VIDEO = 0, AUDIO};
	typedef std::shared_ptr<RcvUDPDataTask> Ptr;
public:
	RcvUDPDataTask();
	virtual ~RcvUDPDataTask();
	int startListener(string peerAddr, int peerPort);
	int startListener(onReadCB cb);
	int stopListener();
private:
	//RTP端口,trackid idx 为数组下标
	Socket::Ptr _rcvSock;
	Socket::Ptr _sendSock;
	uint64_t _ui64TotalBytes = 0;
	MediaType _mediaType;
};


RcvUDPDataTask::RcvUDPDataTask()
{ 
	_sendSock.reset(new Socket(nullptr, false));
	_rcvSock.reset(new Socket(nullptr, false));
} 

int RcvUDPDataTask::stopListener()
{ 
	_rcvSock->closeSock();
	return 0;
}
int RcvUDPDataTask::startListener(onReadCB cb) {
	//设置接收socket
	onceToken token(nullptr, [&]() {
		SockUtil::setRecvBuf(_rcvSock->rawFD(), 4 * 1024 * 1024);
		//SockUtil::setSendBuf(_sendSock->rawFD(), 4 * 1024 * 1024);
	});
 
        //所有收到的包直接回调到cb方法
	_rcvSock->setOnRead(cb);

	_rcvSock->setOnErr([this](const SockException &err) { });
	if (!_rcvSock->bindUdpSock(0, "0.0.0.0")) {
		return -1;
	}
	return _rcvSock->get_local_port();
}

0 人点赞