自定义grpc组件是如何与框架交互的呢?
1,resolver
首先我们看下resolvergolang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker相关的最核心接口,在生成resolver前我们先定义对应的builder,它对应函数Build的参数是ccresolver.ClientConn,它调用服务发现组件获取服务对应地址后,就是通过cc的UpdateState方法,把地址存入连接池中,供后面的balancer来使用的。
代码语言:javascript复制func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.start()
r.cc.UpdateState(resolver.State{Addresses: addrs})
那Build方法是何时调用的呢,当然是Dial初始化连接池时调用的newCCResolverWrapper源码位于:google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go
代码语言:javascript复制func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if cc.dopts.block {
cc.Connect()
s := cc.GetState()
if s == connectivity.Ready {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
cc.resolverWrapper = rWrapper
代码语言:javascript复制func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
ccr := &ccResolverWrapper{
cc: cc,
done: grpcsync.NewEvent(),
}
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
代码语言:javascript复制func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
对应的update方法会将解析结果发送到解析事件通知channel里来进行分发,在这里就和balancer关联上了,将resolve的结果放在参数里传递下去:
代码语言:javascript复制func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
代码位于:google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go
代码语言:javascript复制func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
代码语言:javascript复制type ccStateUpdate struct {
ccs *balancer.ClientConnState
}
对应的有个后台协程一直在监听更新事件,做分发处理
代码语言:javascript复制func (ccb *ccBalancerWrapper) watcher() {
case u := <-ccb.updateCh.Get():
case *ccStateUpdate:
ccb.handleClientConnStateChange(update.ccs)
case *scStateUpdate:
ccb.handleSubConnStateChange(update)
case *exitIdleUpdate:
ccb.handleExitIdle()
case *resolverErrorUpdate:
ccb.handleResolverError(update.err)
case *switchToUpdate:
ccb.handleSwitchTo(update.name)
case *subConnUpdate:
ccb.handleRemoveSubConn(update.acbw)
类似的命令事件还用很多,比如我们发起连接的时候
代码语言:javascript复制 func (cc *ClientConn) Connect() {
cc.balancerWrapper.exitIdle()
代码语言:javascript复制 func (ccb *ccBalancerWrapper) exitIdle() {
ccb.updateCh.Put(&exitIdleUpdate{})
代码语言:javascript复制func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.updateCh.Put(&switchToUpdate{name: name})
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.updateCh.Put(&resolverErrorUpdate{err: err})
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
}
分发后由对应事件处理函数处理
代码语言:javascript复制func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
ccs.ResolverState.Addresses = addrs
ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
代码语言:javascript复制func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
代码语言:javascript复制func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
builder := balancer.Get(name)
if err := ccb.balancer.SwitchTo(builder); err != nil {
代码语言:javascript复制func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
ac, err := ccb.cc.newAddrConn(addrs, opts)
acbw := &acBalancerWrapper{ac: ac}
ac.acbw = acbw
2,balancer
在注册balancer的时候,其实是服用了base的建造者:
代码语言:javascript复制balancer.Register( base.NewBalancerBuilder
代码语言:javascript复制return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})
google.golang.org/grpc@v1.50.1/balancer/base/base.go
代码语言:javascript复制func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
注意到,参数pb PickerBuilder是picker的建造器,也是通过它关联的resolver和picker。google.golang.org/grpc@v1.50.1/balancer/base/balancer.go
代码语言:javascript复制type baseBuilder struct {
name string
pickerBuilder PickerBuilder
config Config
}
在Build方法里初始化baseBalancer
代码语言:javascript复制func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
state: connectivity.Connecting,
}
当子连接状态变化的时候,如果状态是空闲,就会发起连接,是关闭的话,把子连接删除,也就是在这里做了连接状态更新事件的分发:
代码语言:javascript复制func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
代码语言:javascript复制 func (gsb *Balancer) ExitIdle() {
for sc := range balToUpdate.subconns {
sc.Connect()
连接创建在下面这个函数里:
代码语言:javascript复制func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
for _, a := range s.ResolverState.Addresses {
if _, ok := b.subConns.Get(a); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
sc.Connect()
切换balancer定义在ClientConn里,这个时候会调用build方法
代码语言:javascript复制func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
cc.balancerWrapper.switchTo(newBalancerName)
代码语言:javascript复制func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
bw := &balancerWrapper{
gsb: gsb,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
subconns: make(map[balancer.SubConn]bool),
}
newBalancer := builder.Build(bw, gsb.bOpts)
bw.Balancer = newBalancer
3,picker
picker的核心方法是Pick
代码语言:javascript复制func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
代码语言:javascript复制func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
Pick方法是在Invoke的时候调用的,它的链路如下:
代码语言:javascript复制func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
代码语言:javascript复制func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
代码语言:javascript复制func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
最终返回一个可用连接:google.golang.org/grpc@v1.46.0/picker_wrapper.go
代码语言:javascript复制func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
p := pw.picker
pickResult, err := p.Pick(info)
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if t := acw.getAddrConn().getReadyTransport(); t != nil {
func (ac *addrConn) getReadyTransport() transport.ClientTransport {
if ac.state == connectivity.Ready {
return ac.transport
4,subConn
为了将实现和抽象分离,每个可以供我们自定义的编程接口都有对应的wraper,子连接也不例外,它会调用addConn的wraper
代码语言:javascript复制 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
return acbw.ac
代码语言:javascript复制func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
代码语言:javascript复制func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
切换balancer会重新连接新balancer
代码语言:javascript复制func (ac *addrConn) connect() error {
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.resetTransport()
代码语言:javascript复制func (ac *addrConn) resetTransport() {
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
尝试连接所有endpoints
代码语言:javascript复制func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
for _, addr := range addrs {
err := ac.createTransport(addr, copts, connectDeadline)
代码语言:javascript复制func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
其中NewClientTransport,是可以自定义的,所以也有对应wraper
google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go
代码语言:javascript复制func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
代码语言:javascript复制func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
newAC, err := cc.newAddrConn(addrs, opts)
acbw.ac = newAC
if acState != connectivity.Idle {
go newAC.connect()
}
连接关闭的时候会关闭所有子连接:
google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go
代码语言:javascript复制func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if state.ConnectivityState == connectivity.Shutdown {
bw.gsb.mu.Lock()
delete(bw.subconns, sc)
balToUpdate.UpdateSubConnState(sc, state)
代码语言:javascript复制func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
balToUpdate := gsb.latestBalancer()
return balToUpdate.UpdateClientConnState(state)