golang源码分析:grpc 链接池(5)自定义组件和框架交互流程

2023-03-01 16:19:43 浏览数 (2)

自定义grpc组件是如何与框架交互的呢?

1,resolver

首先我们看下resolvergolang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker相关的最核心接口,在生成resolver前我们先定义对应的builder,它对应函数Build的参数是ccresolver.ClientConn,它调用服务发现组件获取服务对应地址后,就是通过cc的UpdateState方法,把地址存入连接池中,供后面的balancer来使用的。

代码语言:javascript复制
func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
      r.start()
      r.cc.UpdateState(resolver.State{Addresses: addrs})

那Build方法是何时调用的呢,当然是Dial初始化连接池时调用的newCCResolverWrapper源码位于:google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go

代码语言:javascript复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
      rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
      if cc.dopts.block {
              cc.Connect()
      s := cc.GetState()
      if s == connectivity.Ready {
      cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
      rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
      cc.resolverWrapper = rWrapper
代码语言:javascript复制
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    ccr := &ccResolverWrapper{
    cc:   cc,
    done: grpcsync.NewEvent(),
  }
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
代码语言:javascript复制
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
      if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {

对应的update方法会将解析结果发送到解析事件通知channel里来进行分发,在这里就和balancer关联上了,将resolve的结果放在参数里传递下去:

代码语言:javascript复制
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
      bw := cc.balancerWrapper
      uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

代码位于:google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript复制
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
      ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
代码语言:javascript复制
type ccStateUpdate struct {
  ccs *balancer.ClientConnState
}

对应的有个后台协程一直在监听更新事件,做分发处理

代码语言:javascript复制
func (ccb *ccBalancerWrapper) watcher() {
      case u := <-ccb.updateCh.Get():
              case *ccStateUpdate:
        ccb.handleClientConnStateChange(update.ccs)
      case *scStateUpdate:
        ccb.handleSubConnStateChange(update)
      case *exitIdleUpdate:
        ccb.handleExitIdle()
      case *resolverErrorUpdate:
        ccb.handleResolverError(update.err)
      case *switchToUpdate:
        ccb.handleSwitchTo(update.name)
      case *subConnUpdate:
        ccb.handleRemoveSubConn(update.acbw)

类似的命令事件还用很多,比如我们发起连接的时候

代码语言:javascript复制
    func (cc *ClientConn) Connect() {
      cc.balancerWrapper.exitIdle()
代码语言:javascript复制
    func (ccb *ccBalancerWrapper) exitIdle() {
      ccb.updateCh.Put(&exitIdleUpdate{})
代码语言:javascript复制
func (ccb *ccBalancerWrapper) switchTo(name string) {
      ccb.updateCh.Put(&switchToUpdate{name: name})
func (ccb *ccBalancerWrapper) resolverError(err error) {
  ccb.updateCh.Put(&resolverErrorUpdate{err: err})
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    ccb.updateCh.Put(&scStateUpdate{
    sc:    sc,
    state: s,
    err:   err,
  }

分发后由对应事件处理函数处理

代码语言:javascript复制
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
      ccs.ResolverState.Addresses = addrs
      ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
代码语言:javascript复制
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
      ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
代码语言:javascript复制
func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
      builder := balancer.Get(name)
      if err := ccb.balancer.SwitchTo(builder); err != nil {
代码语言:javascript复制
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
      ac, err := ccb.cc.newAddrConn(addrs, opts)
      acbw := &acBalancerWrapper{ac: ac}
      ac.acbw = acbw

2,balancer

在注册balancer的时候,其实是服用了base的建造者:

代码语言:javascript复制
balancer.Register( base.NewBalancerBuilder
代码语言:javascript复制
return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})

google.golang.org/grpc@v1.50.1/balancer/base/base.go

代码语言:javascript复制
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {

注意到,参数pb PickerBuilder是picker的建造器,也是通过它关联的resolver和picker。google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

代码语言:javascript复制
type baseBuilder struct {
  name          string
  pickerBuilder PickerBuilder
  config        Config
}

在Build方法里初始化baseBalancer

代码语言:javascript复制
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    bal := &baseBalancer{
    cc:            cc,
    pickerBuilder: bb.pickerBuilder,


    subConns: resolver.NewAddressMap(),
    scStates: make(map[balancer.SubConn]connectivity.State),
    csEvltr:  &balancer.ConnectivityStateEvaluator{},
    config:   bb.config,
    state:    connectivity.Connecting,
  }

当子连接状态变化的时候,如果状态是空闲,就会发起连接,是关闭的话,把子连接删除,也就是在这里做了连接状态更新事件的分发:

代码语言:javascript复制
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  case connectivity.Idle:
    sc.Connect()
  case connectivity.Shutdown:
    // When an address was removed by resolver, b called RemoveSubConn but
    // kept the sc's state in scStates. Remove state for this sc here.
    delete(b.scStates, sc)
  case connectivity.TransientFailure:
代码语言:javascript复制
 func (gsb *Balancer) ExitIdle() {
    for sc := range balToUpdate.subconns {
       sc.Connect()

连接创建在下面这个函数里:

代码语言:javascript复制
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
      for _, a := range s.ResolverState.Addresses {
        if _, ok := b.subConns.Get(a); !ok {
          sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
          sc.Connect()

切换balancer定义在ClientConn里,这个时候会调用build方法

代码语言:javascript复制
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
      cc.balancerWrapper.switchTo(newBalancerName)
代码语言:javascript复制
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
    bw := &balancerWrapper{
    gsb: gsb,

    lastState: balancer.State{
      ConnectivityState: connectivity.Connecting,
      Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
    },
    subconns: make(map[balancer.SubConn]bool),
  }
      newBalancer := builder.Build(bw, gsb.bOpts)
      bw.Balancer = newBalancer

3,picker

picker的核心方法是Pick

代码语言:javascript复制
func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
代码语言:javascript复制
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {

Pick方法是在Invoke的时候调用的,它的链路如下:

代码语言:javascript复制
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
      if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
代码语言:javascript复制
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
      t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
代码语言:javascript复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })

最终返回一个可用连接:google.golang.org/grpc@v1.46.0/picker_wrapper.go

代码语言:javascript复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
      p := pw.picker
      pickResult, err := p.Pick(info)
      acw, ok := pickResult.SubConn.(*acBalancerWrapper)
      if t := acw.getAddrConn().getReadyTransport(); t != nil {
func (ac *addrConn) getReadyTransport() transport.ClientTransport {
      if ac.state == connectivity.Ready {
    return ac.transport

4,subConn

为了将实现和抽象分离,每个可以供我们自定义的编程接口都有对应的wraper,子连接也不例外,它会调用addConn的wraper

代码语言:javascript复制
 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
      return acbw.ac
代码语言:javascript复制
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
代码语言:javascript复制
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
      ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)

切换balancer会重新连接新balancer

代码语言:javascript复制
func (ac *addrConn) connect() error {
      ac.updateConnectivityState(connectivity.Connecting, nil)
      ac.resetTransport()
代码语言:javascript复制
func (ac *addrConn) resetTransport() {
      if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {

尝试连接所有endpoints

代码语言:javascript复制
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
      for _, addr := range addrs {
        err := ac.createTransport(addr, copts, connectDeadline)
代码语言:javascript复制
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
        newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)

其中NewClientTransport,是可以自定义的,所以也有对应wraper

google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript复制
func (acbw *acBalancerWrapper) Connect() {
      go acbw.ac.connect()
代码语言:javascript复制
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
        newAC, err := cc.newAddrConn(addrs, opts)
        acbw.ac = newAC
          if acState != connectivity.Idle {
      go newAC.connect()
    }

连接关闭的时候会关闭所有子连接:

google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go

代码语言:javascript复制
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    if state.ConnectivityState == connectivity.Shutdown {
    bw.gsb.mu.Lock()
    delete(bw.subconns, sc)
     balToUpdate.UpdateSubConnState(sc, state)
代码语言:javascript复制
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
      balToUpdate := gsb.latestBalancer()
      return balToUpdate.UpdateClientConnState(state)

0 人点赞