聊聊dubbo-go的failfastCluster

2020-08-07 09:50:09 浏览数 (1)

本文主要研究一下dubbo-go的failfastCluster

failfastCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster.go

代码语言:javascript复制
type failfastCluster struct{}
​
const failfast = "failfast"
​
func init() {
    extension.SetCluster(failfast, NewFailFastCluster)
}
​
// NewFailFastCluster ...
func NewFailFastCluster() cluster.Cluster {
    return &failfastCluster{}
}
​
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
    return newFailFastClusterInvoker(directory)
}
  • failfastCluster的Join方法执行newFailFastClusterInvoker(directory)

newFailFastClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go

代码语言:javascript复制
type failfastClusterInvoker struct {
    baseClusterInvoker
}
​
func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
    return &failfastClusterInvoker{
        baseClusterInvoker: newBaseClusterInvoker(directory),
    }
}
  • newFailFastClusterInvoker方法创建了failfastClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go

代码语言:javascript复制
func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
    invokers := invoker.directory.List(invocation)
    err := invoker.checkInvokers(invokers, invocation)
    if err != nil {
        return &protocol.RPCResult{Err: err}
    }
​
    loadbalance := getLoadBalance(invokers[0], invocation)
​
    err = invoker.checkWhetherDestroyed()
    if err != nil {
        return &protocol.RPCResult{Err: err}
    }
​
    ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
    return ivk.Invoke(ctx, invocation)
}
  • Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)选择ivk,最后执行ivk.Invoke(ctx, invocation)

getLoadBalance

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

代码语言:javascript复制
func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
    url := invoker.GetUrl()
​
    methodName := invocation.MethodName()
    //Get the service loadbalance config
    lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
​
    //Get the service method loadbalance config if have
    if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {
        lb = v
    }
    return extension.GetLoadbalance(lb)
}
  • getLoadBalance方法通过url获取constant.LOADBALANCE_KEY,然后通过extension.GetLoadbalance(lb)获取cluster.LoadBalance

doSelect

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

代码语言:javascript复制
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
​
    var selectedInvoker protocol.Invoker
    url := invokers[0].GetUrl()
    sticky := url.GetParamBool(constant.STICKY_KEY, false)
    //Get the service method sticky config if have
    sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)
​
    if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
        invoker.stickyInvoker = nil
    }
​
    if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
        if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
            return invoker.stickyInvoker
        }
    }
​
    selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
​
    if sticky {
        invoker.stickyInvoker = selectedInvoker
    }
    return selectedInvoker
​
}
  • doSelect方法先判断是否sticky,是的话且invoker.stickyInvoker不为nil且available,则返回stickyInvoker,否则通过invoker.doSelectInvoker(lb, invocation, invokers, invoked)获取selectedInvoker

doSelectInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

代码语言:javascript复制
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
    if len(invokers) == 1 {
        return invokers[0]
    }
​
    selectedInvoker := lb.Select(invokers, invocation)
​
    //judge to if the selectedInvoker is invoked
​
    if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
        // do reselect
        var reslectInvokers []protocol.Invoker
​
        for _, invoker := range invokers {
            if !invoker.IsAvailable() {
                continue
            }
​
            if !isInvoked(invoker, invoked) {
                reslectInvokers = append(reslectInvokers, invoker)
            }
        }
​
        if len(reslectInvokers) > 0 {
            selectedInvoker = lb.Select(reslectInvokers, invocation)
        } else {
            return nil
        }
    }
    return selectedInvoker
}
  • doSelectInvoker方法通过lb.Select(invokers, invocation)选择selectedInvoker,如果selectedInvoker已经invoked或者非available,则遍历invokers重新构造reslectInvokers,再通过lb.Select(reslectInvokers, invocation)选择selectedInvoker

小结

failfastCluster的Join方法执行newFailFastClusterInvoker(directory);failfastClusterInvoker的Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)选择ivk,最后执行ivk.Invoke(ctx, invocation)

doc

  • failfast_cluster

0 人点赞