多个被调用者在调用时虚拟化为一个被调用者,这就是Dubbo的集群化,本文将从一次碰壁经历出发,讲一讲Dubbo是如何进行集群化的。
(不要吐槽虚拟化,按照我们上课讲的广义定义,抽象就是以不同接口隐藏细节[e.g. 设备->文件],虚拟化就是以相同接口隐藏细节[e.g. 物理设备->虚拟设备])
场景
我需要为所有的Invoker维护他们的metadata,因此在LoadBalance中添加了代码来维护,然而,奇怪的事情发现了,当Invoker数目为1时,LoadBalance居然不发生了。
这可不行,虽然现在不需要这些数据,万一动态增加了Invoker呢?那就存在问题了。我想要让一个被调用者也能实现集群化,行不行?毕竟,集群和单独的Invoker是有本质区别的,而大小为100的集群和大小为1的集群却只有量变。
我希望大小为1的集群依旧能够执行负载均衡的代码,于是想把边界检测给删了。
服务发现源码分析
读Dubbo框架源码,Dubbo在单invoker情况下进行了优化。
代码语言:javascript复制public abstract class AbstractLoadBalance implements LoadBalance {
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
} else {
return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
}
}
我们在自定义LoadBalance实现中进行重载,结果还是不行
代码语言:javascript复制 @Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
} else {
return this.doSelect(invokers, url, invocation);
}
}
这里就要讲Dubbo服务发现的原理了,Directory对应的是zookeeper中注册的服务列表,Router是按照条件筛选服务,LoadBalance是按照算法选择服务。
这里的Cluster就是把一组服务伪装成一个服务,也就是按照我们上面定义的Directory、Router、LoadBalance等规则选出Invoker,作为集群被调用的Invoker。
集群架构
我们来看一看Cluster是怎样通过这些规则伪装成一个Invoker的。(省略无关代码)
Directory:根据URL从Router Chain筛选Invoker
代码语言:javascript复制public abstract class AbstractDirectory<T> implements Directory<T> {
protected RouterChain<T> routerChain;
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));
this.routerChain.setInvokers(newInvokers);
this.invokers = this.multiGroup ? this.toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
}
}
RouterChain: 每个路由依次进行筛选,取交集
代码语言:javascript复制 public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = this.invokers;
Router router;
for(Iterator var4 = this.routers.iterator(); var4.hasNext(); finalInvokers = router.route(finalInvokers, url, invocation)) {
router = (Router)var4.next();
}
return finalInvokers;
}
Router:根据配置项进行筛选,如果match则加入结果中
代码语言:javascript复制 public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
Iterator var5 = invokers.iterator();
while(var5.hasNext()) {
Invoker<T> invoker = (Invoker)var5.next();
if (this.matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
Cluster : 根据Directory创建集群Invoker,通过directory/router获取invokers
代码语言:javascript复制public class FailoverCluster extends AbstractCluster {
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
ClusterInvoker:doInvoke->AbstractInvoker的select
代码语言:javascript复制public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
}
}
AbstractInvoker:select->doselect->loadBalance的select
代码语言:javascript复制public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
this.stickyInvoker = invoker;
}
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
} else if (invokers.size() == 1) {
return (Invoker)invokers.get(0);
} else {
Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
可以看出,关键在于AbstractInvoker的doSelect这一步,将单个服务的集群直接跳过了。我们需要重写Abstract Invoker,Dubbo是开源的,如果直接fork源码那这一步轻而易举,问题是maven项目里这些都是只读的。
然而,艰难地把这里也给重写之后,现实依然残酷,我只能继续向前追溯。
ReferenceConfig: CreateProxy->Cluster的doJoin
经过了化身恶魔之后,我继续看哪里还有边界检测,答案是在引用设置里面。一开始的时候URL只有一条就没有集群。
代码语言:javascript复制public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
}
else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
}
WDNMD!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
我选择死亡。我不走这条路了。我宣布,需要维护数据的集群节点不得小于2。
没法改maven项目的源码好气啊,这就是框架的痛么。
总结
Dubbo实现集群化分为这么几个步骤
准备阶段 (doJoin)
- 监听注册中心的URL并创建Invoker,如果URL数目大于1则将所有Invoker组成集群
- 集群根据URL目录,将invokers根据路由规则进行依次筛选,直到选出符合规则的Invoker
- 返回虚拟化的Invoker,和单Invoker同接口,封装集群容错细节。
调用阶段(doInvoke)
- 满足路由规则的Invoker根据负载均衡选择出对应的invoker
- 选择的invoker按照集群容错策略进行可多次执行的调用,收集到结果再返回
和单独Invoker的区别在于集群容错的时候,可以分别选择不同的Invoker,实际调用多次。甚至可以进行广播,收集所有invoker的结果并返回Qurom。