golang 源码阅读之会议系统ion part I介绍了ion的系统架构和islb的代码,本篇将继续介绍ion的其他几个核心模块:
signal模块
代码位于cmd/signal/main.go,它首先解析了对应的配置文件,注册信号,然后启动了一个grpc server,配置位于:configs/sig.toml
代码语言:javascript复制[nats]
[signal.jwt]
services = ["rtc", "room"]
main函数的代码如下:
代码语言:javascript复制func main() {
go func()
err := http.ListenAndServe(paddr, nil)
sig, err := signal.NewSignal(conf)
err = sig.Start()
defer sig.Close()
srv := grpc.NewServer(
grpc.CustomCodec(nrpc.Codec()), // nolint:staticcheck
grpc.UnknownServiceHandler(nproxy.TransparentLongConnectionHandler(sig.Director)))
s := util.NewWrapperedGRPCWebServer(util.NewWrapperedServerOptions(
addr, conf.Signal.GRPC.Cert, conf.Signal.GRPC.Key, true), srv)
if err := s.Serve();
nrpc包的Codec函数代码位于:
github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/codec.go
提供了grpc的序列化和反序列化方法,供nats使用。信号相关代码位于:pkg/node/signal/signal.go
代码语言:javascript复制func NewSignal(conf Config) (*Signal, error)
nc, err := util.NewNatsConn(conf.Nats.URL)
ndc, err := dc.NewClient(nc)
里面提供了nats 的客户端链接封装
代码语言:javascript复制func (s *Signal) Director(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error)
claims, err := auth.GetClaim(ctx, authConfig)
for _, svc := range claims.Services {
if strings.Contains(fullMethodName, "/" svc ".") {
allowed = true
break
}
}
cli, err := s.NewNatsRPCClient(svc, "*", parameters)
TransparentLongConnectionHandler代码位于:
github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/proxy/handler.go
代码语言:javascript复制func TransparentLongConnectionHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director, false}
return streamer.handler
}
代码语言:javascript复制func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
outgoingCtx, clientConnIf, err := s.director(serverStream.Context(), fullMethodName)
clientStream, err = nrpcClient.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName)
clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, gpcClientConn, fullMethodName)
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
获取所有的方法做代理转发,pkg/util/wrapped.go
代码语言:javascript复制func (s *WrapperedGRPCWebServer) Serve() error
grpcweb.WithOriginFunc(s.makeHTTPOriginFunc(allowedOrigins)),
grpcweb.WithWebsocketOriginFunc(s.makeWebsocketOriginFunc(allowedOrigins)),
grpcweb.WithWebsocketPingInterval(s.options.WebsocketPingInterval),
wrappedServer := grpcweb.WrapServer(s.GRPCServer, options...)
handler := func(resp http.ResponseWriter, req *http.Request) {
wrappedServer.ServeHTTP(resp, req)
}
tls, err := tls.Listen("tcp", addr, config)
g.Go(func() error { return s.GRPCServer.Serve(grpcListener) })
g.Go(func() error { return httpServer.Serve(httpListener) })
g.Go(m.Serve)
添加一系列middlewar,然后监听服务。
room模块
app-room模块代码位于apps/room/main.go,它注册了room server和singnal server
代码语言:javascript复制func main() {
node := room.New()
err := node.Load(confFile)
err = node.Start()
defer node.Close()
具体实现代码位于:ion/apps/room/server/room.go
代码语言:javascript复制func (r *RoomServer) Start() error {
err = r.Node.Start(r.conf.Nats.URL)
ndc, err := natsDiscoveryClient.NewClient(r.NatsConn())
r.natsConn = r.NatsConn()
r.RoomService = *NewRoomService(r.conf.Redis)
r.RoomSignalService = *NewRoomSignalService(&r.RoomService)
room.RegisterRoomServiceServer(r.Node.ServiceRegistrar(), &r.RoomService)
room.RegisterRoomSignalServer(r.Node.ServiceRegistrar(), &r.RoomSignalService)
go func() {
err := r.Node.KeepAlive(node)
go func() {
err := r.Node.Watch(proto.ServiceALL)
会议相关的代码位于:apps/conference/main.go
代码语言:javascript复制func main() {
err := http.ListenAndServe(paddr, nil)
r := runner.New(util.NewWrapperedServerOptions(addr, certFile, keyFile, true))
err := r.AddService(
runner.ServiceUnit{
Service: room.New(),
ConfigFile: roomConfFile,
},
runner.ServiceUnit{
Service: sfu.New(),
ConfigFile: sfuConfFile,
},
)
其中sfu定义在pkg/node/sfu/sfu.go
代码语言:javascript复制type SFU struct {
ion.Node
s *SFUService
runner.Service
conf Config
}
代码语言:javascript复制type RoomServer struct {
// for standalone running
runner.Service
// grpc room service
RoomService
RoomSignalService
// for distributed node running
ion.Node
natsConn *nats.Conn
natsDiscoveryCli *natsDiscoveryClient.Client
// config
conf Config
}
apps/room/server/room.go
文件定义了room相关的结构体
代码语言:javascript复制type RoomServer struct {
// for standalone running
runner.Service
// grpc room service
RoomService
RoomSignalService
// for distributed node running
ion.Node
natsConn *nats.Conn
natsDiscoveryCli *natsDiscoveryClient.Client
// config
conf Config
}
sfu模块
sfu模块的代码位于cmd/sfu/main.go
代码语言:javascript复制func main() {}
err := conf.Load(confFile)
err := http.ListenAndServe(paddr, nil)
node := sfu.NewSFU()
err := node.Start(conf);
defer node.Close()
相关的结构体定义在 pkg/node/sfu/sfu.go
代码语言:javascript复制type SFU struct {
ion.Node
s *SFUService
runner.Service
conf Config
}
代码语言:javascript复制type Config struct {
Global global `mapstructure:"global"`
Log logConf `mapstructure:"log"`
Nats natsConf `mapstructure:"nats"`
isfu.Config
}
它启动了一个rtc server:
代码语言:javascript复制func (s *SFU) Start(conf Config) error {
err := s.Node.Start(conf.Nats.URL)
s.s = NewSFUService(conf.Config)
pb.RegisterRTCServer(s.Node.ServiceRegistrar(), s.s)
go func() {
err := s.Node.KeepAlive(node)
go func() {
err := s.Node.Watch(proto.ServiceALL)
其中node定义在pkg/ion/node.go:
代码语言:javascript复制type Node struct {
// Node ID
NID string
// Nats Client Conn
nc *nats.Conn
// gRPC Service Registrar
nrpc *nrpc.Server
// Service discovery client
ndc *ndc.Client
nodeLock sync.RWMutex
//neighbor nodes
neighborNodes map[string]discovery.Node
cliLock sync.RWMutex
clis map[string]*nrpc.Client
}
代码语言:javascript复制func (n *Node) ServiceRegistrar() grpc.ServiceRegistrar {
return n.nrpc
}
而 sfu service定义在pkg/node/sfu/service.go
代码语言:javascript复制type SFUService struct {
rtc.UnimplementedRTCServer
sfu *ion_sfu.SFU
mutex sync.RWMutex
sigs map[string]rtc.RTC_SignalServer
}
pkg/runner/runner.go
代码语言:javascript复制type Service interface {
New() Service
ConfigBase() ConfigBase
StartGRPC(registrar grpc.ServiceRegistrar) error
Close()
}
avp模块和auth模块并没有实现
cmd/avp/main.go
apps/auth/main.go
以上就是会议系统服务端的核心代码。