一、服务端provider发布流程回顾
根据dubbo启动日志,provider的发布动作为以下几个步骤:
(1)暴露本地服务
Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。
(2)暴露远程服务
Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。
(3)启动netty
Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。
(4)打开zk
Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。
(5)注册provider服务到zk
Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。
to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。
(6)监听zk(订阅与通知)
Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。
Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。
· 服务发布的目的
解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。
代码语言:javascript复制<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
cluster="failfast" timeout="3000"/>
上篇文章已经提到zookeeper是如何被初始化与连接的,这一篇主要分析下dubbo服务是如何在zookeeper上进行节点创建与他们的订阅关系。
二、dubbo节点如何创建在zookeeper
创建dubbo节点是建立在远程服务暴露的源码基础上:
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//暴露远程服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//初始化注册信息、连接zk
代码语言:javascript复制 final Registry registry = getRegistry(originInvoker);
//注册服务 - 即在注册中心创建dubbo服务节点
register(registryUrl, registeredProviderUrl); //...}
代码语言:javascript复制public void register(URL registryUrl, URL registedProviderUrl) { //获取注册信息
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
先获取注册信息,由于在之前的代码中已经对注册信息初始化过,因此直接会获取缓存中的注册信息,进行dubbo节点创建。
创建dubbo节点核心代码:
代码语言:javascript复制public void register(URL url) { //往注册队列中添加需要注册的服务
super.register(url);
//删除注册异常和未注册队列中的服务 failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
doRegister(url);
} catch (Exception e) {
//出现异常,则添加到注册异常队列中
failedRegistered.add(url);
}
}//path为服务端api路径与参数信息
//ehemeral为是否持久化 - 默认为true
代码语言:javascript复制public void create(String path, boolean ephemeral) {
//...
if (ephemeral) { //创建临时节点
createEphemeral(path) } else { //创建持久节点 createPersistent(path);
}
}
上述的path即服务端api路径与注册信息:
以上为dubbo的服务端节点创建过程,将注册服务放入到注册队列,最后通过注册对象创建节点,创建临时节点,注册完毕。
· createEphemeral与createPersistent
createEphemeral表示临时节点,他与客户端会话绑定,一但服务端服务被关闭或会话失效,那么这个客户端所创建的临时节点都会被删除。
createPersistent表示持久化节点,表示服务节点一但被创建,除非触发主动删除,否则一直存储在ZK中。
因此服务端服务如果被关闭,那么所创建的节点没有必要继续存在zk中,否则客户端还会不断来进行订阅,因此在dubbo服务节点的创建中,采用了临时节点的创建来处理。
但是,对于dubbo服务中的providers/configuration/routes等信息会被作为持久化节点来创建。具体节点信息如下图所示。
三、dubbo服务的订阅
dubbo服务的订阅是建立在远程服务暴露的源码基础上:
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//暴露远程服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//初始化注册信息、连接zk
代码语言:javascript复制 final Registry registry = getRegistry(originInvoker);
//注册服务 - 即在注册中心创建dubbo服务节点
register(registryUrl, registeredProviderUrl); //... //订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);}
订阅相关代码:
代码语言:javascript复制public void subscribe(URL url, NotifyListener listener) { //将订阅服务对象 加入到订阅队列中
super.subscribe(url, listener);
//移除订阅失败队列 removeFailedSubscribed(url, listener);
try { //...
//向服务器发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) { //...
//出现异常则添加到订阅失败队列中
addFailedSubscribed(url, listener);
}
}
代码语言:javascript复制protected void doSubscribe(final URL url, final NotifyListener listener) {
//...
} else { //子结点数据集合
代码语言:javascript复制 List<URL> urls = new ArrayList<URL>();
//将url拆分对应的监听集合,如router/configuration/provider
for (String path : toCategoriesPath(url)) { //监听集合中获取
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
//获取不到,则重新new一个,放到监听集合中 if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
} //从集合中,获取监听订阅对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) { //获取不到 创建一个监听
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
//订阅配置信息 执行notify方法,主要用于收到订阅后的处理 notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
} //创建一个订阅的节点 - 为持久化节点
zkClient.create(path, false);
//加入到监听队列中 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
} //全量数据dubbo服务订阅处理
notify(url, listener, urls);
代码语言:javascript复制 }
//...
}
加入到监听队列中的方法-主要用于收到订阅后的处理,如:删除节点、修改节点、添加子节点。
以上为dubbo服务的订阅,总结主要分为三个步骤:
(1)创建持久化dubbo配置节点,即/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。
(2)加入订阅/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。
(3)收到订阅后的处理。
四、收到订阅后的处理-notify
代码语言:javascript复制protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//... //已经通知过的列表
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) { //对订阅的结点-configurationproviders等进行遍历
String category = entry.getKey() String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//把服务端的注册url信息更新到本地dubbo缓存中
saveProperties(url);
listener.notify(categoryList);
}
}
listener.notify的核心代码:
代码语言:javascript复制public synchronized void notify(List<URL> urls) { //分别存放三种类型的集合
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " category " in notified url: " url " from registry " getUrl().getAddress() " to consumer " NetUtils.getLocalHost());
}
}
//更新服务端configuration配置信息
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
//更新缓存的路由配置信息
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) {
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
//重建invoker实例
refreshInvoker(invokerUrls);
}
notify方法主要是对订阅的服务端的configuration、routes配置信息进行更新,最后重新生成服务提供api的invoker实例,执行完毕。