Go
语言本身具备出色的性能,然而在流媒体服务器这种CPU
密集 IO
密集的双重压力下,GC
带来的性能损失是最主要的矛盾。而减少GC
的操作最直接的办法就是减少内存申请,多多复用内存。本文将围绕内存复用这个主题,把M7S
中相关技术原理讲解一遍,也是M7S
性能优化的历程。
读写内存共享
在早期我在研究过许多流媒体服务器的数据转发模式,基本都是在发送给订阅者时将内存复制一份的方式实现读写分离,虽然没有并发问题,但是内存频繁的申请和复制比较消耗资源。
在M7S v1版本中,也沿用了传统的方式。然而Go语言由于采用GC的方式管理内存,导致频繁申请内存会加大GC的压力。
在网友的启发下,从v2
版本开始,采用了基于RingBuffer
的内存共享读写方式。大大减少了内存复制。
在
Monibuca
中每一个流(Stream
)对象包含多个Track
(分为音视频Track和DataTrack)每个Track
包含一个RingBuffer
。发布者将数据填入这个RingBuffer
中,订阅者则从RingBuffer
中读取数据再封装到协议中发送出去,形成转发的核心逻辑。
下面的视频是当时开发的一个UI
,实时获取RingBuffer
的信息用SVG
绘制而成。其中发布者正在不断写入数据,订阅者紧随其后不断读取数据。
由于发布者以及订阅者不在同一个协程中,访问同一个块内存很有可能引起并发读写的问题。如何解决并发读写呢?M7S
经过不断的迭代在这块上面实践了各种方法。既要考虑到性能,还要考虑到代码的可读性和可维护性。
sync.RWMutex
这是最容易想到的,在M7S v2
中就采用了读写锁。操作步骤如下:
- 先锁住Ring中的下一个待写入单元,再将本次写完的单元释放写锁。
- 在本读写单元中等待读取的订阅者在写锁释放的同时获取到读锁,开始读取数据
有点类似人走路的方式,前脚着地后,后脚再离地。可以保证订阅者无法跑到发布者前面。
优点是可读性很强,一眼就能看懂这个原理。缺点是,锁的开销比较大,性能损失很明显。还有一个缺点,就是当订阅者阻塞,会导致发布者追上订阅者,写锁无法获取从而阻塞整个流。(后来Go出了TryLock)
WaitGroup
v3
中采用了这个,但是WaitGroup
的Wait
操作是一个无限阻塞的操作,必须用Done
操作才能结束等待,此时就会有一个问题,engine
和发布者有可能会同时去调用Done
完成释放(具体原因另开章节介绍)。因此Done
就会多调用一次导致panic
。后来通过复杂的原子操作解决了(但是大大降低了代码的可读性)。
time.Sleep
v4
中采用了伪自旋锁,所谓的伪自旋锁,就是模仿自旋锁的机制,只是用time.Sleep
代替了,runtime.Gosched
,减少了自旋次数,从而提高性能。
for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.Frame.wait() {
}
CanRead不需要原子操作,有人担心可能会有并发读写问题,其原理同前面说的人走路是一样的,即便出现了并发读写,也不影响逻辑正确运行。最多就是多等待一个周期,稍微增加一点点延迟。
sync.Cond
在v1
版本中由于使用的是简单的内存复制,于是有人给了这个方案,但是我却一直绕了一大圈,最后回到这个方案上了,也算是自作聪明。sync.Cond
之所以一开始没有选择,是因为里面包含了一个锁(标准库内部强制调用了锁)
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
所以就认为性能不高,直到绕了一大圈之后,才找到一个避免锁的方案。当然这些弯路可能必须要走,因为直到自己写了伪自旋锁,才增加了一个是否可读的属性,也就是说有了这个属性后,我们其实只需要一个唤醒的功能即可,于是想到了给sync.Cond
提供一个空的锁对象的方式避免了锁:
type emptyLocker struct{}
func (emptyLocker) Lock() {}
func (emptyLocker) Unlock() {}
var EmptyLocker emptyLocker
sync.Cond
在唤醒协程的时候使用的是Broadcast
方法,这个方法可以多次调用而无副作用(不像WaitGroup
的Done
方法)。也可以减少伪自旋锁带来的轻微延迟。
实际测试中使用Cond比伪自旋锁大概可以节省10%左右的CPU消耗
协议转换中的内存复用
协议转换可以用下面的逻辑来实现:
实际情况比这个要复杂一些。所以这里面第一步需要引入go标准库中的net.Buffers来表示“连续的内存”(实际并不一定连续)。当收到一个协议传来的数据时尽量保留,而不去复制它。
同一个协议转发
对于相同的协议,能复用的内存更多一些,举个例子:
RTMP转发到RTMP
RTMP
中传输视频帧的格式为AVCC
格式,这也是能复用的部分,在实际传输过程中这部分内存并非一个连续内存。RTMP
有chunk
机制,会把AVCC
切割一块块传输,并加上chunk header
。
chunk header | avcc part1 | chunk header | avcc part2 ······
这个分割的大小默认是128字节,通常RTMP
协议会经过协商修改这个大小,因此传入和传出的分块大小不一定相同。那如何复用AVCC
的数据呢?此时我们需要用到net.Buffers
来表示一帧AVCC
数据。
| avcc part1 | avcc part2 ······
当我们需要另一种分块大小的数据时,可以对原始数据再分割。比如说原始数据是256字节分块的:
代码语言:javascript复制| 256Bytes | 256Bytes ······
而新的分块要求是128Bytes的
代码语言:javascript复制| 128Bytes | 128Bytes | 128Bytes | 128Bytes ······
我们并没有申请新的内存,只是多了一些切片。那有人就可能会问了,如果不是正好倍数关系呢?其实无非就是多切几块。比如新的分块要求是200Bytes:
代码语言:javascript复制| 200Bytes | 56Bytes| 144Bytes | 112Byts | 88Bytes ······
用下面的图更加直观:
这样发送的时候,并不是一个连续内存,那如何发送呢?这里就用到了writev
(windows对应的是WSASend
)技术。在Go语言中通过net.Buffers类型写入数据会自动判断使用的技术。
RTSP转发到RTSP
RTSP
协议传输的媒体数据是RTP
包,RTP
包在理想状态下,可以完全复用,就是直接把RTP
包缓存起来,等需要发送的时候直接把这个RTP
数据原封不动的发出去。在m7s
中,由于需要有跳帧追帧的逻辑,所以需要修改时间戳,就无法原封不动的发送RTP
包,但是也可以复用其中的Payload
部分。
HLS转发到HLS
在纯转发模式下,可以直接将TS
切片缓存,完全复用。如果需要将HLS
转换成其他协议,则需要将TS
格式数据进行解包处理。
FLV转发到FLV
FLV
格式由于数据格式也是avcc
格式,因此处理逻辑就按照avcc
格式统一处理了,FLV
的tag
头无法复用,涉及到时间戳需要重新生成。
不同协议转发
不同协议之间转发由于两两排列组合很多,因此需要抽象出大类来处理。
协议分类
RTMP、FLV、MP4
该类协议视频是AVCC
格式,音频是裸格式(RTMP
包含一到两个字节的头)
RTSP、WebRTC
该类的视频是RTP
(Header 裸NALU
) 音频是RTP
(Header AuHeaderLen
AuHeader
xN Au
xN )
HLS、GB28181
这类使用的MPEG2-TS
、MPEG2-PS
作为传输协议 视频采用Header AnnexB
音频采用Header ADTS
AAC
内存复用
总体而言,视频格式都是前缀 NALU
这种方式,AnnexB
的前缀是00 00 00 01
,而Avcc的前缀是 CTS
、 NALU
长度等,因此将NALU
缓存起来就可以复用NALU
数据。在实际实现中,为了方便同类型的协议转换,会同时缓存Avcc
格式、RTP
格式、以及裸格式,而这三种格式的NALU
部分都共用一组内存(内存不连续)
减少发布者的GC
GC的产生
对于一个发布者,即需要不断从网络或是本地文件中读取数据的对象,在不做任何优化的情况下,都会不停的申请内存。例如使用io.ReadAll
这种操作,内部会频繁的申请内存。频繁申请内存的结果就是GC
压力很大,尤其是高并发的时候,GC
带来的消耗可以达到50%
的CPU
消耗。
sync.Pool
当然我最先想到的一定是使用内存池,也就是sync.Pool
来管理需要使用的内存,但是sync.Pool
有个缺陷,就是为了协程安全内部有锁。尽管使用了多级缓存等一些列优化手段,最终使用的时候也会消耗一定的性能(经过实测性能开销很大)。而且sync.Pool
比较通用,并不是针对特定的对象使用,我们这里是针对[]byte
类型进行复用。
自定义Pool
如果Pool
不含有锁,性能会大幅提升,那如何解决协程安全呢?答案是协程不安全,即我们只在一个协程里面去操作Pool
的取出和放回。通常情况下一个发布者的写入是在同一个协程中的,比如rtmp
协议。少数协议如rtsp
可能会有多个协程写入数据,因此最后我们是每一个Track
一个Pool,保持一个Track
一个协程写入。
下图表示的是自定义Pool
的结构:
每个Pool
是一个数组,数组的每一个元素是一个链表,链表的每一个元素是一个包含[]byte
的类型,大小是2
的数组下标次幂。
0号元素有特殊用途,由于我们需要记录每一块内存所属的链表来回收,因此需要有一个外壳,而外壳(ListItem
)也是需要回收的。而0号元素是存放的只有外壳需要回收而无需回收Value
(需要GC
的对象)的链表。
type List[T any] struct {
ListItem[T]
Length int
}
type ListItem[T any] struct {
Value T
Next, Pre *ListItem[T] `json:"-" yaml:"-"`
Pool *List[T] `json:"-" yaml:"-"` // 回收池
list *List[T]
}
type BytesPool []List[Buffer]
回收内存
当RingBuffer
中的访问单元被覆盖时,就可以将其中所有的内存对象进行放回Pool
。由此实现了从内存使用的闭环,消除了GC
。下图中红色箭头代表内存复用机制,可以有效避免申请内存操作。
后记
经过上面三板斧的优化后,整体性能提升了50%
以上。下图测试10000
路rtmp
推流的对比:m7s
内存占用较高一些,原因就是采用了内存池来减少GC
造成的。使用内存来换CPU,在这种场景下还是值得的。
流媒体服务器 | 10000路推流CPU消耗 |
---|---|
monibuca | 90%~100% |
zlm | 90%~100% |
srs | 80%~90% |
lal | 160%~200% |
由于livego的推流需要先调用一次HTTP获取密钥,所以无法使用压测工具批量推流,本次对比无法参与。
所有流媒体服务器配置均关闭了协议转换的开关,并以Release方式编译。服务器也去除了所有限制,并以完全相同的操作方式进行压测。