golang源码分析:grpc 链接池(2)

2023-03-01 16:18:29 浏览数 (1)

继续上一篇golang源码分析:grpc 链接池(1),我们从源码来分析,我们将从连接池的建立,请求发起的时候获取连接,以及最终关闭连接三个流程进行源码分析。

1,创建连接的过程

源码入口位于google.golang.org/grpc@v1.46.0/clientconn.go

代码语言:javascript复制
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  return DialContext(context.Background(), target, opts...)
}
代码语言:javascript复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
      cc := &ClientConn{
    target:            target,
    csMgr:             &connectivityStateManager{},
    conns:             make(map[*addrConn]struct{}),
    dopts:             defaultDialOptions(),
    blockingpicker:    newPickerWrapper(),
    czData:            new(channelzData),
    firstResolveEvent: grpcsync.NewEvent(),
  }
  resolverBuilder, err := cc.parseTargetAndFindResolver()
  cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
    DialCreds:        credsClone,
    CredsBundle:      cc.dopts.copts.CredsBundle,
    Dialer:           cc.dopts.copts.Dialer,
    Authority:        cc.authority,
    CustomUserAgent:  cc.dopts.copts.UserAgent,
    ChannelzParentID: cc.channelzID,
    Target:           cc.parsedTarget,
  })
  rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  if cc.dopts.block {
    for {
      cc.Connect()
      }
  }

首先,获取域名解析器的构造器,比如拿到获取自定义的dns解析器;然后得到负载均衡器cc.balancerWrapper;最后得到解析器。

google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go

代码语言:javascript复制
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error)
        ccr := &ccResolverWrapper{
    cc:   cc,
    done: grpcsync.NewEvent(),
  }
      ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)

如果我们在Dial的时候指定选项grpc.WithBlock(),就会在for循环里进行尝试连接,否则返回连接器,在请求到来的时候才进行真正的连接。接函数的定义如下,将当前连接退出idle状态:

代码语言:javascript复制
func (cc *ClientConn) Connect() {
  cc.balancerWrapper.exitIdle()
}

连接的状态定义在google.golang.org/grpc@v1.46.0/connectivity/connectivity.go,可以看到有6个状态:

代码语言:javascript复制
func (s State) String() string {
  switch s {
  case Idle:
    return "IDLE"
  case Connecting:
    return "CONNECTING"
  case Ready:
    return "READY"
  case TransientFailure:
    return "TRANSIENT_FAILURE"
  case Shutdown:
    return "SHUTDOWN"
  default:
    logger.Errorf("unknown connectivity state: %d", s)
    return "INVALID_STATE"
  }
}

在banlancer里会启动一个协程watch,服务端连接状态的变化

代码语言:javascript复制
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper
      ccb := &ccBalancerWrapper{
    cc:       cc,
    updateCh: buffer.NewUnbounded(),
    resultCh: buffer.NewUnbounded(),
    closed:   grpcsync.NewEvent(),
    done:     grpcsync.NewEvent(),
  }
    go ccb.watcher()
    ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)

根据状态channel里的类型执行响应的操作分发,代码位于google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript复制
type ccBalancerWrapper struct {
  cc *ClientConn
  // Since these fields are accessed only from handleXxx() methods which are
  // synchronized by the watcher goroutine, we do not need a mutex to protect
  // these fields.
  balancer        *gracefulswitch.Balancer
  curBalancerName string
  
  updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
  resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
  closed   *grpcsync.Event   // Indicates if close has been called.
  done     *grpcsync.Event   // Indicates if close has completed its work.
}

里面有两个channel分别是处理更新时间和执行结果

代码语言:javascript复制
func (ccb *ccBalancerWrapper) watcher() {
          case u := <-ccb.updateCh.Get():
      ccb.updateCh.Load()
      if ccb.closed.HasFired() {
        break
      }
      switch update := u.(type) {
      case *ccStateUpdate:
        ccb.handleClientConnStateChange(update.ccs)
代码语言:javascript复制
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
  ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}
代码语言:javascript复制
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
      ccb.updateCh.Put(&scStateUpdate{

回过头来,我们就可以看到,我们发起连接的时候,exitIdle具体干了什么,就是往channel里发送更新的事件:

代码语言:javascript复制
func (ccb *ccBalancerWrapper) exitIdle() {
  ccb.updateCh.Put(&exitIdleUpdate{})
}

对应的处理事件如下:如果状态是Idle,执行ExitIdle,退出空闲状态

代码语言:javascript复制
func (ccb *ccBalancerWrapper) handleExitIdle() {
  if ccb.cc.GetState() != connectivity.Idle {
    return
  }
  ccb.balancer.ExitIdle()
}

channel的类型是一个无限长的队列,每次处理第一个,如果来不及处理,就放在对接末尾,类似nginx的backlog队列:

google.golang.org/grpc@v1.46.0/internal/buffer/unbounded.go

代码语言:javascript复制
type Unbounded struct {
  c       chan interface{}
  mu      sync.Mutex
  backlog []interface{}
}
代码语言:javascript复制
func (b *Unbounded) Put(t interface{}) {
        if len(b.backlog) == 0 {
    select {
    case b.c <- t:
      b.backlog = append(b.backlog, t)
代码语言:javascript复制
func (b *Unbounded) Load() {
        if len(b.backlog) > 0 {
    select {
    case b.c <- b.backlog[0]:
      b.backlog[0] = nil
      b.backlog = b.backlog[1:]
代码语言:javascript复制
func (b *Unbounded) Get() <-chan interface{} {

分析完队列后,我们看下ExitIdle的内容是什么,源码位于:

google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go

代码语言:javascript复制
func (gsb *Balancer) ExitIdle() {
        for sc := range balToUpdate.subconns {
          sc.Connect()
      }

对每个subConn都尝试着去进行连接,这里才是发起服务端连接的真正地方。subconns保存在balancerWrapper里。

代码语言:javascript复制
type balancerWrapper struct {
  balancer.Balancer
  gsb *Balancer

  lastState balancer.State
  subconns  map[balancer.SubConn]bool // subconns created by this balancer
}

当需要更新连接状态的时候,根据传入的状态,发送对应的消息,进行连接状态的更新:

代码语言:javascript复制
func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
      if state.ConnectivityState == connectivity.Shutdown {
        delete(bw.subconns, sc)
      bw.Balancer.UpdateSubConnState(sc, state)

真正执行状态更新的函数是:

代码语言:javascript复制
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
        if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
    balToUpdate = gsb.balancerCurrent
  } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
    balToUpdate = gsb.balancerPending
  }
      balToUpdate.UpdateSubConnState(sc, state)

SubConn的定义位于google.golang.org/grpc@v1.46.0/balancer/balancer.go

代码语言:javascript复制
type SubConn interface {
  // UpdateAddresses updates the addresses used in this SubConn.
  // gRPC checks if currently-connected address is still in the new list.
  // If it's in the list, the connection will be kept.
  // If it's not in the list, the connection will gracefully closed, and
  // a new connection will be created.
  //
  // This will trigger a state transition for the SubConn.
  //
  // Deprecated: This method is now part of the ClientConn interface and will
  // eventually be removed from here.
  UpdateAddresses([]resolver.Address)
  // Connect starts the connecting for this SubConn.
  Connect()
}

这里定义了Picker接口,用来从连接池中选择一个可用连接

代码语言:javascript复制
type Picker interface {
  // Pick returns the connection to use for this RPC and related information.
  //
  // Pick should not block.  If the balancer needs to do I/O or any blocking
  // or time-consuming work to service this call, it should return
  // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
  // the Picker is updated (using ClientConn.UpdateState).
  //
  // If an error is returned:
  //
  // - If the error is ErrNoSubConnAvailable, gRPC will block until a new
  //   Picker is provided by the balancer (using ClientConn.UpdateState).
  //
  // - If the error is a status error (implemented by the grpc/status
  //   package), gRPC will terminate the RPC with the code and message
  //   provided.
  //
  // - For all other errors, wait for ready RPCs will wait, but non-wait for
  //   ready RPCs will be terminated with this error's Error() string and
  //   status code Unavailable.
  Pick(info PickInfo) (PickResult, error)
}

并且定义了Banlancer的接口,用户可以自定义banlancer实现这个接口,包括UpdateClientConnState和 UpdateSubConnState

代码语言:javascript复制
type Balancer interface {
  // UpdateClientConnState is called by gRPC when the state of the ClientConn
  // changes.  If the error returned is ErrBadResolverState, the ClientConn
  // will begin calling ResolveNow on the active name resolver with
  // exponential backoff until a subsequent call to UpdateClientConnState
  // returns a nil error.  Any other errors are currently ignored.
  UpdateClientConnState(ClientConnState) error
  // ResolverError is called by gRPC when the name resolver reports an error.
  ResolverError(error)
  // UpdateSubConnState is called by gRPC when the state of a SubConn
  // changes.
  UpdateSubConnState(SubConn, SubConnState)
  // Close closes the balancer. The balancer is not required to call
  // ClientConn.RemoveSubConn for its existing SubConns.
  Close()
}

官方包里给了很多种banlancer的实现,比如ringhash

google.golang.org/grpc@v1.46.0/xds/internal/balancer/ringhash/ringhash.go

代码语言:javascript复制
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
  b := &ringhashBalancer{
    cc:       cc,
    subConns: make(map[resolver.Address]*subConn),
    scStates: make(map[balancer.SubConn]*subConn),
    csEvltr:  &connectivityStateEvaluator{},
  }

2,用户发起客户端请求的时候的调用过程

源码入口位于:google.golang.org/grpc@v1.46.0/call.go

代码语言:javascript复制
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {

      return invoke(ctx, method, args, reply, cc, opts...)

它会创建一个clientStream然后发送和接收消息:

代码语言:javascript复制
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
      cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)

google.golang.org/grpc@v1.46.0/stream.go

代码语言:javascript复制
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
        var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
         return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
  }
代码语言:javascript复制
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
        if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
    cs.finish(err)
    return nil, err
  }
代码语言:javascript复制
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
      t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)

google.golang.org/grpc@v1.46.0/clientconn.go,从banlancer中pick一个链接

代码语言:javascript复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error)
        t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })

google.golang.org/grpc@v1.46.0/picker_wrapper.go,通过wrapper在用户自定义balancer里面的picke方法,获取连接

代码语言:javascript复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
      pickResult, err := p.Pick(info)
      acw, ok := pickResult.SubConn.(*acBalancerWrapper)
      if t := acw.getAddrConn().getReadyTransport(); t != nil {

3,关闭连接的过程

源码入口位于:google.golang.org/grpc@v1.46.0/clientconn.go

代码语言:javascript复制
func (cc *ClientConn) Close() error {
      cc.csMgr.updateState(connectivity.Shutdown)
   for ac := range conns {
    ac.tearDown(ErrClientConnClosing)
  }

依次关闭连接池中的所有连接。

代码语言:javascript复制
func (ac *addrConn) tearDown(err error) {

      tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct

// will leak. In most cases, call cc.removeAddrConn() instead.

      ac.updateConnectivityState(connectivity.Shutdown, nil)
代码语言:javascript复制
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
      ac.state = s
      ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)

它也是通过更细状态的方式来影响连接池状态机的

代码语言:javascript复制
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
  cc.balancerWrapper.updateSubConnState(sc, s, err)
}

0 人点赞