golang 源码阅读之会议系统ion part II

2022-08-03 13:55:32 浏览数 (1)

golang 源码阅读之会议系统ion part I介绍了ion的系统架构和islb的代码,本篇将继续介绍ion的其他几个核心模块:

signal模块

代码位于cmd/signal/main.go,它首先解析了对应的配置文件,注册信号,然后启动了一个grpc server,配置位于:configs/sig.toml

代码语言:javascript复制
[nats]
[signal.jwt]
services = ["rtc", "room"]

main函数的代码如下:

代码语言:javascript复制
func main() {
      go func()
        err := http.ListenAndServe(paddr, nil)
      sig, err := signal.NewSignal(conf)
      err = sig.Start()
      defer sig.Close()
        srv := grpc.NewServer(
        grpc.CustomCodec(nrpc.Codec()), // nolint:staticcheck
        grpc.UnknownServiceHandler(nproxy.TransparentLongConnectionHandler(sig.Director)))

        s := util.NewWrapperedGRPCWebServer(util.NewWrapperedServerOptions(
    addr, conf.Signal.GRPC.Cert, conf.Signal.GRPC.Key, true), srv)
      if err := s.Serve();

nrpc包的Codec函数代码位于:

github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/codec.go

提供了grpc的序列化和反序列化方法,供nats使用。信号相关代码位于:pkg/node/signal/signal.go

代码语言:javascript复制
func NewSignal(conf Config) (*Signal, error) 
      nc, err := util.NewNatsConn(conf.Nats.URL)
      ndc, err := dc.NewClient(nc)

里面提供了nats 的客户端链接封装

代码语言:javascript复制
func (s *Signal) Director(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error)
      claims, err := auth.GetClaim(ctx, authConfig)
          for _, svc := range claims.Services {
      if strings.Contains(fullMethodName, "/" svc ".") {
        allowed = true
        break
      }
    }
      cli, err := s.NewNatsRPCClient(svc, "*", parameters)

TransparentLongConnectionHandler代码位于:

github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/proxy/handler.go

代码语言:javascript复制
func TransparentLongConnectionHandler(director StreamDirector) grpc.StreamHandler {
  streamer := &handler{director, false}
  return streamer.handler
}
代码语言:javascript复制
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error 
        fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
        outgoingCtx, clientConnIf, err := s.director(serverStream.Context(), fullMethodName)
          clientStream, err = nrpcClient.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName)
        clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, gpcClientConn, fullMethodName)
        s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
  c2sErrChan := s.forwardClientToServer(clientStream, serverStream)

获取所有的方法做代理转发,pkg/util/wrapped.go

代码语言:javascript复制
func (s *WrapperedGRPCWebServer) Serve() error 
      grpcweb.WithOriginFunc(s.makeHTTPOriginFunc(allowedOrigins)),
      grpcweb.WithWebsocketOriginFunc(s.makeWebsocketOriginFunc(allowedOrigins)),
      grpcweb.WithWebsocketPingInterval(s.options.WebsocketPingInterval),

        wrappedServer := grpcweb.WrapServer(s.GRPCServer, options...)
  handler := func(resp http.ResponseWriter, req *http.Request) {
    wrappedServer.ServeHTTP(resp, req)
  }
      tls, err := tls.Listen("tcp", addr, config)
  g.Go(func() error { return s.GRPCServer.Serve(grpcListener) })
  g.Go(func() error { return httpServer.Serve(httpListener) })
  g.Go(m.Serve)

添加一系列middlewar,然后监听服务。

room模块

app-room模块代码位于apps/room/main.go,它注册了room server和singnal server

代码语言:javascript复制
func main() {
      node := room.New()
      err := node.Load(confFile)
      err = node.Start()
      defer node.Close()

具体实现代码位于:ion/apps/room/server/room.go

代码语言:javascript复制
func (r *RoomServer) Start() error {
       err = r.Node.Start(r.conf.Nats.URL)
        ndc, err := natsDiscoveryClient.NewClient(r.NatsConn())
        r.natsConn = r.NatsConn()
        r.RoomService = *NewRoomService(r.conf.Redis)
        r.RoomSignalService = *NewRoomSignalService(&r.RoomService)
        room.RegisterRoomServiceServer(r.Node.ServiceRegistrar(), &r.RoomService)
        room.RegisterRoomSignalServer(r.Node.ServiceRegistrar(), &r.RoomSignalService)
          go func() {
    err := r.Node.KeepAlive(node)
          go func() {
    err := r.Node.Watch(proto.ServiceALL)

会议相关的代码位于:apps/conference/main.go

代码语言:javascript复制
func main() {
    err := http.ListenAndServe(paddr, nil)
    r := runner.New(util.NewWrapperedServerOptions(addr, certFile, keyFile, true))
      err := r.AddService(
    runner.ServiceUnit{
      Service:    room.New(),
      ConfigFile: roomConfFile,
    },
    runner.ServiceUnit{
      Service:    sfu.New(),
      ConfigFile: sfuConfFile,
    },
  )

其中sfu定义在pkg/node/sfu/sfu.go

代码语言:javascript复制
type SFU struct {
  ion.Node
  s *SFUService
  runner.Service
  conf Config
}
代码语言:javascript复制
type RoomServer struct {
  // for standalone running
  runner.Service


  // grpc room service
  RoomService
  RoomSignalService


  // for distributed node running
  ion.Node
  natsConn         *nats.Conn
  natsDiscoveryCli *natsDiscoveryClient.Client


  // config
  conf Config
}

apps/room/server/room.go

文件定义了room相关的结构体

代码语言:javascript复制
type RoomServer struct {
  // for standalone running
  runner.Service


  // grpc room service
  RoomService
  RoomSignalService


  // for distributed node running
  ion.Node
  natsConn         *nats.Conn
  natsDiscoveryCli *natsDiscoveryClient.Client


  // config
  conf Config
}

sfu模块

sfu模块的代码位于cmd/sfu/main.go

代码语言:javascript复制
func main() {}
      err := conf.Load(confFile)
      err := http.ListenAndServe(paddr, nil)
      node := sfu.NewSFU()
       err := node.Start(conf);
      defer node.Close()

相关的结构体定义在 pkg/node/sfu/sfu.go

代码语言:javascript复制
type SFU struct {
  ion.Node
  s *SFUService
  runner.Service
  conf Config
}
代码语言:javascript复制
type Config struct {
  Global global   `mapstructure:"global"`
  Log    logConf  `mapstructure:"log"`
  Nats   natsConf `mapstructure:"nats"`
  isfu.Config
}

它启动了一个rtc server:

代码语言:javascript复制
func (s *SFU) Start(conf Config) error {
      err := s.Node.Start(conf.Nats.URL)
      s.s = NewSFUService(conf.Config)
      pb.RegisterRTCServer(s.Node.ServiceRegistrar(), s.s)
        go func() {
    err := s.Node.KeepAlive(node)
        go func() {
    err := s.Node.Watch(proto.ServiceALL)

其中node定义在pkg/ion/node.go:

代码语言:javascript复制
type Node struct {
  // Node ID
  NID string
  // Nats Client Conn
  nc *nats.Conn
  // gRPC Service Registrar
  nrpc *nrpc.Server
  // Service discovery client
  ndc *ndc.Client


  nodeLock sync.RWMutex
  //neighbor nodes
  neighborNodes map[string]discovery.Node


  cliLock sync.RWMutex
  clis    map[string]*nrpc.Client
}
代码语言:javascript复制
func (n *Node) ServiceRegistrar() grpc.ServiceRegistrar {
  return n.nrpc
}

而 sfu service定义在pkg/node/sfu/service.go

代码语言:javascript复制
type SFUService struct {
  rtc.UnimplementedRTCServer
  sfu   *ion_sfu.SFU
  mutex sync.RWMutex
  sigs  map[string]rtc.RTC_SignalServer
}

pkg/runner/runner.go

代码语言:javascript复制
type Service interface {
  New() Service
  ConfigBase() ConfigBase
  StartGRPC(registrar grpc.ServiceRegistrar) error
  Close()
}

avp模块和auth模块并没有实现

cmd/avp/main.go

apps/auth/main.go

以上就是会议系统服务端的核心代码。

0 人点赞