序
本文主要研究一下dubbo-go的failfastCluster
failfastCluster
dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster.go
代码语言:javascript复制type failfastCluster struct{}
const failfast = "failfast"
func init() {
extension.SetCluster(failfast, NewFailFastCluster)
}
// NewFailFastCluster ...
func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{}
}
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
}
- failfastCluster的Join方法执行newFailFastClusterInvoker(directory)
newFailFastClusterInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go
代码语言:javascript复制type failfastClusterInvoker struct {
baseClusterInvoker
}
func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failfastClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
- newFailFastClusterInvoker方法创建了failfastClusterInvoker
Invoke
dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go
代码语言:javascript复制func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
loadbalance := getLoadBalance(invokers[0], invocation)
err = invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
return ivk.Invoke(ctx, invocation)
}
- Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)选择ivk,最后执行ivk.Invoke(ctx, invocation)
getLoadBalance
dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go
代码语言:javascript复制func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
url := invoker.GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {
lb = v
}
return extension.GetLoadbalance(lb)
}
- getLoadBalance方法通过url获取constant.LOADBALANCE_KEY,然后通过extension.GetLoadbalance(lb)获取cluster.LoadBalance
doSelect
dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go
代码语言:javascript复制func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)
if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
invoker.stickyInvoker = nil
}
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
return invoker.stickyInvoker
}
}
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
}
return selectedInvoker
}
- doSelect方法先判断是否sticky,是的话且invoker.stickyInvoker不为nil且available,则返回stickyInvoker,否则通过invoker.doSelectInvoker(lb, invocation, invokers, invoked)获取selectedInvoker
doSelectInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go
代码语言:javascript复制func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 1 {
return invokers[0]
}
selectedInvoker := lb.Select(invokers, invocation)
//judge to if the selectedInvoker is invoked
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
// do reselect
var reslectInvokers []protocol.Invoker
for _, invoker := range invokers {
if !invoker.IsAvailable() {
continue
}
if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}
if len(reslectInvokers) > 0 {
selectedInvoker = lb.Select(reslectInvokers, invocation)
} else {
return nil
}
}
return selectedInvoker
}
- doSelectInvoker方法通过lb.Select(invokers, invocation)选择selectedInvoker,如果selectedInvoker已经invoked或者非available,则遍历invokers重新构造reslectInvokers,再通过lb.Select(reslectInvokers, invocation)选择selectedInvoker
小结
failfastCluster的Join方法执行newFailFastClusterInvoker(directory);failfastClusterInvoker的Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)选择ivk,最后执行ivk.Invoke(ctx, invocation)
doc
- failfast_cluster