golang 源码阅读之会议系统ion part IV

2022-08-03 13:57:10 浏览数 (1)

在介绍了后端代码和前端的webapp之后,我们开始介绍前后端结合的部分https://github.com/pion/ion-sdk-js,它主要在proto生成的ts代码基础上做了一些封装,提供完整的接口供webapp使用。

proto 首先看下proto是如何生成的,在ion-sdk-js/Makefile文件中,我们可以看到相关代码:

代码语言:javascript复制
proto:
  mkdir -p src/_library
  #sudo npm i -g ts-protoc-gen@0.15.0
  protoc ./ion/proto/ion/ion.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
  protoc ./ion/proto/rtc/rtc.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
  protoc ./ion/apps/room/proto/room.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
  mkdir -p lib
  cp -rf ./src/_library lib

可以看到它主要是用了3个proto文件

./ion/proto/ion/ion.proto

./ion/proto/rtc/rtc.proto

./ion/apps/room/proto/room.proto

生成的代码路径在ion-sdk-js/src/_library/proto/路径和ion-sdk-js/src/_library/apps/room/proto/路径下面下面。

我们先看下room对应的proto文件ion/apps/room/proto/room.proto

代码语言:javascript复制
syntax = "proto3";

option go_package = "github.com/pion/ion/apps/room/proto";

package room;

service RoomService {
  // Manager API
  // Room API
  rpc CreateRoom(CreateRoomRequest) returns (CreateRoomReply) {}
  rpc UpdateRoom(UpdateRoomRequest) returns (UpdateRoomReply) {}
  rpc EndRoom(EndRoomRequest) returns (EndRoomReply) {}
  rpc GetRooms(GetRoomsRequest) returns (GetRoomsReply) {}

  // Peer API
  rpc AddPeer(AddPeerRequest) returns (AddPeerReply) {}
  rpc UpdatePeer(UpdatePeerRequest) returns (UpdatePeerReply) {}
  rpc RemovePeer(RemovePeerRequest) returns (RemovePeerReply) {}
  rpc GetPeers(GetPeersRequest) returns (GetPeersReply) {}
}

service RoomSignal {
  // Signal
  rpc Signal(stream Request) returns (stream Reply) {}
}

enum ErrorType {
  None = 0;
  UnkownError = 1;
  PermissionDenied = 2;
  ServiceUnavailable = 3;
  RoomLocked = 4;
  PasswordRequired = 5;
  RoomAlreadyExist = 6;
  RoomNotExist = 7;
  InvalidParams = 8;
  PeerAlreadyExist = 9;
  PeerNotExist = 10;
}

message Error {
  ErrorType code = 1;
  string reason = 2;
}

message Request {
  oneof payload {
    JoinRequest join = 1;
    LeaveRequest leave = 2;
    SendMessageRequest sendMessage = 3;
  }
}

message Reply {
  oneof payload {
    JoinReply join = 1;
    LeaveReply leave = 2;
    SendMessageReply sendMessage = 3;
    PeerEvent Peer = 4;
    Message message = 5;
    Disconnect disconnect = 6;
    Room room = 7;
  }
}

message CreateRoomRequest {
  Room room = 1;
}

message CreateRoomReply {
  bool success = 1;
  Error error = 2;
}

message DeleteRoomRequest {
  string sid = 1;
}

message DeleteRoomReply {
  bool success = 1;
  Error error = 2;
}

message JoinRequest {
  Peer peer = 1;
  string password = 2;
}

message Room {
  string sid = 1;
  string name = 2;
  bool lock = 3;
  string password = 4;
  string description = 5;
  uint32 maxPeers = 6;
}

message JoinReply {
  bool success = 1;
  Error error = 2;
  Role role = 3;
  Room room = 4;
}

message LeaveRequest {
  string sid = 1;
  string uid = 2;
}

message LeaveReply {
  bool success = 1;
  Error error = 2;
}

enum Role {
  Host = 0;
  Guest = 1;
}

enum Protocol {
  ProtocolUnknown = 0;
  WebRTC = 1;
  SIP = 2;
  RTMP = 3;
  RTSP = 4;
}

message Peer {
  enum Direction {
    INCOMING = 0;
    OUTGOING = 1;
    BILATERAL = 2;
  }
  string sid = 1;
  string uid = 2;
  string displayName = 3;
  bytes extraInfo = 4;
  string destination = 5; // rtsp/rtmp/sip url
  Role role = 6;
  Protocol protocol = 7;
  string avatar = 8;
  Direction direction  = 9;
  string vendor = 10;
}

message AddPeerRequest {
  Peer peer = 1;
}

message AddPeerReply {
  bool success = 1;
  Error error = 2;
}

message GetPeersRequest { string sid = 1; }

message GetPeersReply { 
  bool success = 1;
  Error error = 2;
  repeated Peer Peers = 3; 
}

message Message {
  string from = 1;   // UUID of the sending Peer.
  string to = 2;     // UUID of the receiving Peer.
  string type = 3;   // MIME content-type of the message, usually text/plain.
  bytes payload = 4; // Payload message contents.
}

message SendMessageRequest {
  string sid = 1;
  Message message = 2;
}

message SendMessageReply {
  bool success = 1;
  Error error = 2;
}

message Disconnect {
  string sid = 1;
  string reason = 2;
}

enum PeerState {
  JOIN = 0;
  UPDATE = 1;
  LEAVE = 2;
}

message PeerEvent {
  Peer Peer = 1;
  PeerState state = 2;
}

message UpdateRoomRequest {
  Room room = 1;
}

message UpdateRoomReply {
  bool success = 1;
  Error error = 2;
}

message EndRoomRequest {
  string sid = 1;
  string reason = 2;
  bool delete = 3;
}

message EndRoomReply {
  bool success = 1;
  Error error = 2;
}

message GetRoomsRequest {

}

message GetRoomsReply {
  bool success = 1;
  Error error = 2;
  repeated Room rooms = 3;
}

message UpdatePeerRequest {
  Peer peer = 1;
}

message UpdatePeerReply {
  bool success = 1;
  Error error = 2;
}

message RemovePeerRequest {
  string sid = 1;
  string uid = 2;
}

message RemovePeerReply {
  bool success = 1;
  Error error = 2;
}

可以看到主要包含了房间的增删改查和peer的增删改查。其他两个类似。

client封装

查看client封装了哪些接口,我们从webpack.config.js这个文件开始,它包含了4个入口

代码语言:javascript复制
./src/index.ts
./src/connector/index.ts
./src/signal/grpc-web-impl.ts
./src/signal/json-rpc-impl.ts

其中信号处理封装了grpc和json-rpc两个版本的实现。

./src/index.ts文件很简单,它引入了client stream 和 signal并暴露出去。

代码语言:javascript复制
import Client from './client';
import { LocalStream, RemoteStream, Constraints, Layer } from './stream';
import { Signal, Trickle } from './signal';
export { Client, LocalStream, RemoteStream, Constraints, Signal, Trickle, Layer };

在ion-sdk-js/src/client.ts文件里定义了client类,它包含了我们需要的大部份函数

代码语言:javascript复制
export default class Client {
  transports?: Transports<Role, Transport>;
  private config: Configuration;
  private signal: Signal;

  ontrack?: (track: MediaStreamTrack, stream: RemoteStream) => void;
  ondatachannel?: (ev: RTCDataChannelEvent) => void;
  onspeaker?: (ev: string[]) => void;
  onerrnegotiate?: (
    role: Role,
    err: Error,
    offer?: RTCSessionDescriptionInit,
    answer?: RTCSessionDescriptionInit,
  ) => void;
  onactivelayer?: (al: ActiveLayer) => void;

  constructor(
    signal: Signal,
    config: Configuration = {
      codec: 'vp8',
      iceServers: [
        {
          urls: ['stun:stun.l.google.com:19302', 'stun:stun1.l.google.com:19302'],
        },
      ],
    },
) {
    this.signal = signal;
    this.config = config;

    signal.onnegotiate = this.negotiate.bind(this);
    signal.ontrickle = this.trickle.bind(this);
  }

  async join(sid: string, uid: string) {
    this.transports = {
      [Role.pub]: new Transport(Role.pub, this.signal, this.config),
      [Role.sub]: new Transport(Role.sub, this.signal, this.config),
    };

    this.transports[Role.sub].pc.ontrack = (ev: RTCTrackEvent) => {
      const stream = ev.streams[0];
      const remote = makeRemote(stream, this.transports![Role.sub]);

      if (this.ontrack) {
        this.ontrack(ev.track, remote);
      }
    };

    const apiReady = new Promise<void>((resolve) => {
      this.transports![Role.sub].pc.ondatachannel = (ev: RTCDataChannelEvent) => {
        if (ev.channel.label === API_CHANNEL) {
          this.transports![Role.sub].api = ev.channel;
          this.transports![Role.pub].api = ev.channel;
          ev.channel.onmessage = (e) => {
            try {
              const msg = JSON.parse(e.data);
              this.processChannelMessage(msg);
            } catch (err) {
              /* tslint:disable-next-line:no-console */
              console.error(err);
            }
          };
          resolve();
          return;
        }

        if (this.ondatachannel) {
          this.ondatachannel(ev);
        }
      };
    });

    const offer = await this.transports[Role.pub].pc.createOffer();
    await this.transports[Role.pub].pc.setLocalDescription(offer);
    const answer = await this.signal.join(sid, uid, offer);
    await this.transports[Role.pub].pc.setRemoteDescription(answer);
    this.transports[Role.pub].candidates.forEach((c) => this.transports![Role.pub].pc.addIceCandidate(c));
    this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);

    return apiReady;
  }

  leave() {
    if (this.transports) {
      Object.values(this.transports).forEach((t) => t.pc.close());
      delete this.transports;
    }
  }

  getPubStats(selector?: MediaStreamTrack) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }
    return this.transports[Role.pub].pc.getStats(selector);
  }

  getSubStats(selector?: MediaStreamTrack) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }
    return this.transports[Role.sub].pc.getStats(selector);
  }

  publish(stream: LocalStream, encodingParams?: RTCRtpEncodingParameters[]) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }
    stream.publish(this.transports[Role.pub], encodingParams);
  }

  restartIce() {
    this.renegotiate(true);
  }

  createDataChannel(label: string) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }
    return this.transports[Role.pub].pc.createDataChannel(label);
  }

  close() {
    if (this.transports) {
      Object.values(this.transports).forEach((t) => t.pc.close());
    }
    this.signal.close();
  }

  private trickle({ candidate, target }: Trickle) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }
    if (this.transports[target].pc.remoteDescription) {
      this.transports[target].pc.addIceCandidate(candidate);
    } else {
      this.transports[target].candidates.push(candidate);
    }
  }

  private async negotiate(description: RTCSessionDescriptionInit) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }

    let answer: RTCSessionDescriptionInit | undefined;
    try {
      await this.transports[Role.sub].pc.setRemoteDescription(description);
      this.transports[Role.sub].candidates.forEach((c) => this.transports![Role.sub].pc.addIceCandidate(c));
      this.transports[Role.sub].candidates = [];
      answer = await this.transports[Role.sub].pc.createAnswer();
      await this.transports[Role.sub].pc.setLocalDescription(answer);
      this.signal.answer(answer);
    } catch (err) {
      /* tslint:disable-next-line:no-console */
      console.error(err);
      if (this.onerrnegotiate) this.onerrnegotiate(Role.sub, err, description, answer);
    }
  }

  private onNegotiationNeeded() {
    this.renegotiate(false);
  }

  private async renegotiate(iceRestart: boolean) {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }

    let offer: RTCSessionDescriptionInit | undefined;
    let answer: RTCSessionDescriptionInit | undefined;
    try {
      offer = await this.transports[Role.pub].pc.createOffer({ iceRestart });
      await this.transports[Role.pub].pc.setLocalDescription(offer);
      answer = await this.signal.offer(offer);
      await this.transports[Role.pub].pc.setRemoteDescription(answer);
    } catch (err) {
      /* tslint:disable-next-line:no-console */
      console.error(err);
      if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);
    }
  }

  private processChannelMessage(msg: any) {
    if (msg.method !== undefined && msg.params !== undefined) {
      switch (msg.method) {
        case 'audioLevels':
          if (this.onspeaker) {
            this.onspeaker(msg.params);
          }
          break;
        case 'activeLayer':
          if (this.onactivelayer) {
            this.onactivelayer(msg.params);
          }
          break;
        default:
        // do nothing
      }
    } else {
      // legacy channel message - payload contains audio levels
      if (this.onspeaker) {
        this.onspeaker(msg);
      }
    }
  }
}

在ion-sdk-js/src/connector/index.ts里面包装了后端的各种链接方式,包括ion,room和rtc。

代码语言:javascript复制
import { Configuration } from '../client';
import { LocalStream, RemoteStream, Constraints } from '../stream';
import { Connector, Service } from './ion';
import { RTC, Subscription } from './rtc';
import { Room, JoinResult, Peer, PeerState, PeerEvent, Role, Protocol, Direction, Message, RoomInfo, Disconnect } from './room';

export {
    Configuration,
    LocalStream,
    RemoteStream,
    Constraints,
    Connector,
    Service,
    RTC,
    Room,
    JoinResult,
    Peer,
    PeerState,
    PeerEvent,
    Role,
    Protocol,
    Direction,
    Message,
    RoomInfo,
    Disconnect,
    Subscription
};

在ion-sdk-js/src/signal/index.ts里定义了singal接口,同一文件夹下定义了grpc和json-rpc两个实现

代码语言:javascript复制
import { Trickle } from '../client';
export { Trickle };

export interface Signal {
  onnegotiate?: (jsep: RTCSessionDescriptionInit) => void;
  ontrickle?: (trickle: Trickle) => void;

  join(sid: string, uid: null | string, offer: RTCSessionDescriptionInit): Promise<RTCSessionDescriptionInit>;
  offer(offer: RTCSessionDescriptionInit): Promise<RTCSessionDescriptionInit>;
  answer(answer: RTCSessionDescriptionInit): void;
  trickle(trickle: Trickle): void;
  close(): void;
}

总的来说,整个sdk还是比较轻量的,仅仅包含了对proto文件生成的ts的一个简单的包装。

0 人点赞