一、为什么用ion-sfu
1.简介
ion-sfu作为ion分布式架构里的核心模块,SFU是选择转发单元的简称,可以分发WebRTC的媒体流。ion-sfu从pion/ion拆分出来,经过社区打磨,是目前GO方案中最成熟且使用最广的SFU。
https://github.com/pion/ion
已经有多家开始商用了,这点国外公司比较快,比如:100ms、Screenleap和Tandem等。
100ms:https://www.100ms.live/
Screenleap:https://www.screenleap.com/
Tandem:https://tandem.chat/
2.ion-sfu优点
- 纯GO,开发效率高,且能帮你绕过很多坑
- 单进程多协程模型: - 可以利用多核 - 大大降低级联/单端口复杂度(其他SFU,可能存在本机不同worker间relay的问题;监听单端口时,存在worker间抢包的问题)
- 高并发,曾在谷歌云4核压测到单房间50方会议 (大概2500路流-0.5Mbps)
- 功能全面: - 双PeerConnection 多Track设计,有良好的浏览器兼容性,节省系统资源 - 支持多对多音视频通信 - 支持大小流Simulcast - 支持屏幕分享Screenshare - 支持发言方自动检测Audio-Level-Detect - 支持定制DataChannel - 支持节点间Relay - 支持单端口,大大降低部署难度 - 完善的抗弱网机制,抗丢包40%左右,支持TWCC/REMB PLI/FIR Nack SR/RR等
- 配套SDK完善,JS/Flutter/GO等
3.使用方式
ion-sfu使用方式有两种:
- 作为服务使用,比如编译带grpc或jsonrpc信令的ion-sfu,然后再做一个自己的信令服务(推荐ion分布式套装),远程调用即可。
- 作为包使用,import导入,然后做二次开发。此时抛弃了cmd下边的信令层,只需导入pkg/sfu下边的包即可,然后自行定制信令层,可以在sfu、session、peer层面,通过继承接口定制自己的业务,比较复杂。
import (
sfu "github.com/pion/ion-sfu/pkg/sfu"
)
二、架构与模块
上面给一个简单架构图,很多细节表示不出来,需要看代码。
1.简介
得益于GO,ion-sfu整体代码精简,拥有极高的开发效率。结合现有SDK使用,可以避免很多坑:ion-sdk-js等。
ion-sfu基于pion/webrtc,所以代码风格偏标准webrtc,比如:PeerConnection。因为是使用了标准API,熟悉了之后很容易看懂其他工程,比如:ion-sdk-go/js/flutter。
这样从前到后,整体门槛都降低了。
2.工程组织
这里给出主要模块列表:
代码语言:javascript复制
代码语言:javascript复制├── Makefile //用来编译二进制和grpc文件
├── bin //编译好的二进制目录
├── cmd
│ └── signal //包含三个主文件 grpc、jsonrpc、allrpc
├── config.toml //配置文件
├── examples //网页示例目录
├── pkg
├── buffer //buffer包,用于缓存包
├── logger //日志
├── middlewares //中间件,主要是支持自定义datachannel
├── relay //中继
├── sfu //sfu主模块,包含router、session、peer等
├── stats //状态统计
└── twcc //transport-cc
3.信令层
信令代码和主程序在一起,在cmd/signal/下边。
- 支持jsonrpc,主要处理逻辑在:
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
代码语言:javascript复制
- 支持grpc,主要处理逻辑在:
func (s *SFUServer) Signal(stream pb.SFU_SignalServer) error {
代码语言:javascript复制
而allrpc,是jsonrpc和grpc的合体封装,运行时会进入上面两个函数。
信令很简单:
- join:加入一个session。
- description:发起offer或回复answer,用于协商和重协商。
- trickle:发送trickle candidate。
另外,出于简单考虑,一些信令和事件,直接走datachannel了,比如:大小流切换、声音检测、自定义信令等。
4.媒体层
媒体层的主要模块:
代码语言:javascript复制
代码语言:javascript复制├── audioobserver.go //声音检测
├── datachannel.go //dc中间件的封装
├── downtrack.go //下行track
├── helpers.go //工具函数集
├── mediaengine.go //SDP相关codec、rtp参数设置
├── peer.go //peer封装,一个peer包含一个publisher和一个subscriber,双pc设计
├── publisher.go //publisher,封装上行pc
├── receiver.go //subscriber,封装下行pc
├── router.go //router,包含pc、session、一组receivers
├── sequencer.go //记录包的信息:序列号sn、偏移、时间戳ts等
├── session.go //会话,包含多个peer、dc
├── sfu.go //分发单元,包含多个session、dc
├── simulcast.go //大小流配置
├── subscriber.go //subscriber,封装下行pc、DownTrack、dc
└── turn.go //内置turn server
代码语言:javascript复制
相比以前版本,增加了一些interface,主要是为了作为包使用时,封装自己的类。
三、主函数与信令流程
1.主函数
这里拿jsonrpc来分析,其他rpc流程上是一样的。
代码语言:javascript复制
代码语言:javascript复制func main() {
if !parse() {
showHelp()
os.Exit(-1)
}
//创建SFU和DC,这里的DC用于Simulcast和AudioLevel
s := sfu.NewSFU(conf)
dc := s.NewDatachannel(sfu.APIChannelLabel)
dc.Use(datachannel.SubscriberAPI)
//接下来是标准websocket服务器启动的流程
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
//这里jsonrpc基于websocket,websocket从标准http upgrade过来的
http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
defer c.Close()
//这里创建了JSONSignal,每次真实请求到来时会新建一个Peer,进入Handle函数处理
p := server.NewJSONSignal(sfu.NewPeer(s), logger)
defer p.Close()
jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p)
<-< span="">jc.DisconnectNotify()
}))
go startMetrics(metricsAddr)
var err error
if key != "" && cert != "" {
logger.Info("Started listening", "addr", "https://" addr)
err = http.ListenAndServeTLS(addr, cert, key, nil)
} else {
logger.Info("Started listening", "addr", "http://" addr)
err = http.ListenAndServe(addr, nil)
}
if err != nil {
panic(err)
}
}
代码语言:javascript复制
2.协商&重协商
协商(negotiate):
WebRTC对外的类是PeerConnection,简称PC,通过信令服务交换SDP给PC进行操作。协商就是指双方通过信令交换SDP,通过PC的一些接口,达到协商双方的媒体格式、传输地址端口等信息,从而实现推流和播放的目的。
一次协商完整流程:
本端CreateOffer-》本端SetLocalDescription(offer)-》本端发送offer-》对端SetRemoteDescription(offer)-》对端CreateAnswer-》SetLocalDescription(answer)-》对端对端返回answer-》本端SetRemoteDescription(answer)
重协商(renegotiate):
就是指再次协商。
为什么要重协商?
因为客户端和服务器的track都是变化的,重协商是通知对端的必要手段,比如:客户端发起屏幕分享,服务器有人进出房间等。
重协商的原则:
谁变化谁发起(offer)。
3.信令流程
首先,客户端ws连接成功:
服务端会建立一个Peer,可以参考上边代码。
然后,客户端发起第一次协商:
客户端pc.CreateOffer(一个只包含dc的offer)-》pc.SetLocalDescription(offer),然后把offer放入Join信令,发送给服务端,然后服务器协商【pc.SetRemoteDescription(offer)-》pc.CreateAnswer-》pc.SetLocalDescription(answer)】,返回answer给客户端,至此完成数据通道(datachannel)建立。首先打通dc,是为了方便audio-level/simulcast通道的建立,此时也可以创建自定dc做定制业务。
接下来,服务端发起第二次协商:
服务端pc.CreateOffer,SetLocalDescription,发送offer,此时offer会携带上此房间内的所有track信息,客户端收到后会CreateAnswer,SetLocalDescription,把answer返回来,然后服务端pc.SetRemoteDescription(answer),此时客户端可以收到服务器此房间内的所有流了。
最后,客户端publish发流时会发起第三次协商:
同第一次流程一样,不同的是同时携带了音视频的track,本次协商完成后,服务器可以收到客户端的流了,收到之后会对同房间内的其他客户端发起重协商。
往后只要客户端或服务器track有变化,都会再次发起重协商。
4.代码分析
JsonRPC所有的信令都会进入Handle函数。为了简化流程,可以暂时不看Trickle和OnIceCandidate函数,这个是开启trickle-ICE时才会有。
代码语言:javascript复制
代码语言:javascript复制// Handle incoming RPC call events like join, answer, offer and trickle
// JSONSignal是继承了LocalPeer,所以会继承一些属性和回调:OnOffer等。
// 可以在浏览器端ws网络工具里查看具体信令内容
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
replyError := func(err error) {
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
}
switch req.Method {
case "join":// 首先客户端会发join信令过来
var join Join
err := json.Unmarshal(*req.Params, &join)
if err != nil {
p.Logger.Error(err, "connect: error parsing offer")
replyError(err)
break
}
//设置OnOffer,即SFU发起offer时(重协商),会使用这个回调,比如重协商时,因为有很多客户端peer同时连到SFU,每个Peer的Track增删时,SFU需要向其他Peer重协商来告诉Track的变更
p.OnOffer = func(offer *webrtc.SessionDescription) {
if err := conn.Notify(ctx, "offer", offer); err != nil {
p.Logger.Error(err, "error sending offer")
}
}
//设置OnIceCandidate,即SFU在ICE流程获取到新候选时,会回调这个函数,告诉客户端新增了啥候选
p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) {
if err := conn.Notify(ctx, "trickle", Trickle{
Candidate: *candidate,
Target: target,
}); err != nil {
p.Logger.Error(err, "error sending ice candidate")
}
}
//加入某个会话(房间)
err = p.Join(join.SID, join.UID, join.Config)
if err != nil {
replyError(err)
break
}
//根据offer回复answer
answer, err := p.Answer(join.Offer)
if err != nil {
replyError(err)
break
}
_ = conn.Reply(ctx, req.ID, answer)
//如果是客户端发offer,回复answer,此时为客户端发起重协商
case "offer":
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
p.Logger.Error(err, "connect: error parsing offer")
replyError(err)
break
}
answer, err := p.Answer(negotiation.Desc)
if err != nil {
replyError(err)
break
}
_ = conn.Reply(ctx, req.ID, answer)
//如果是客户端发answer,设置SetRemoteDescription即可
case "answer":
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
p.Logger.Error(err, "connect: error parsing offer")
replyError(err)
break
}
err = p.SetRemoteDescription(negotiation.Desc)
if err != nil {
replyError(err)
}
//如果是客户端发送Trickle-ICE的候选过来,设置即可
case "trickle":
var trickle Trickle
err := json.Unmarshal(*req.Params, &trickle)
if err != nil {
p.Logger.Error(err, "connect: error parsing candidate")
replyError(err)
break
}
err = p.Trickle(trickle.Candidate, trickle.Target)
if err != nil {
replyError(err)
}
}
}
代码语言:javascript复制
注意OnOffer是服务器重协商的回调,即房间内某客户端track有变化,服务器会回调此函数通知其他客户端。
总结一句话,客户端《---》SFU的核心逻辑就是不断重协商,谁变化谁发起offer。