dubbo学习(六)服务发布-dubbo服务在zk的创建、订阅

2020-10-28 17:13:58 浏览数 (1)

一、服务端provider发布流程回顾

根据dubbo启动日志,provider的发布动作为以下几个步骤:

(1)暴露本地服务

Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。

(2)暴露远程服务

Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。

(3)启动netty

Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。

(4)打开zk

Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。

(5)注册provider服务到zk

Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。

to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。

(6)监听zk(订阅与通知)

Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

· 服务发布的目的

解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。

代码语言:javascript复制
<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
               cluster="failfast" timeout="3000"/>

上篇文章已经提到zookeeper是如何被初始化与连接的,这一篇主要分析下dubbo服务是如何在zookeeper上进行节点创建与他们的订阅关系。

二、dubbo节点如何创建在zookeeper

创建dubbo节点是建立在远程服务暴露的源码基础上:

代码语言:javascript复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    URL registryUrl = getRegistryUrl(originInvoker);
    //初始化注册信息、连接zk
代码语言:javascript复制
    final Registry registry = getRegistry(originInvoker);
    //注册服务 - 即在注册中心创建dubbo服务节点
    register(registryUrl, registeredProviderUrl);    //...}
代码语言:javascript复制
public void register(URL registryUrl, URL registedProviderUrl) {    //获取注册信息
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

先获取注册信息,由于在之前的代码中已经对注册信息初始化过,因此直接会获取缓存中的注册信息,进行dubbo节点创建。

创建dubbo节点核心代码:

代码语言:javascript复制
public void register(URL url) {    //往注册队列中添加需要注册的服务
    super.register(url);
    //删除注册异常和未注册队列中的服务    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        doRegister(url);
    } catch (Exception e) {
        //出现异常,则添加到注册异常队列中
        failedRegistered.add(url);
    }
}//path为服务端api路径与参数信息
//ehemeral为是否持久化 - 默认为true
代码语言:javascript复制
public void create(String path, boolean ephemeral) {
    //...
    if (ephemeral) {        //创建临时节点
        createEphemeral(path)    } else {        //创建持久节点        createPersistent(path);
    }
}

上述的path即服务端api路径与注册信息:

以上为dubbo的服务端节点创建过程,将注册服务放入到注册队列,最后通过注册对象创建节点,创建临时节点,注册完毕。

· createEphemeral与createPersistent

createEphemeral表示临时节点,他与客户端会话绑定,一但服务端服务被关闭或会话失效,那么这个客户端所创建的临时节点都会被删除。

createPersistent表示持久化节点,表示服务节点一但被创建,除非触发主动删除,否则一直存储在ZK中。

因此服务端服务如果被关闭,那么所创建的节点没有必要继续存在zk中,否则客户端还会不断来进行订阅,因此在dubbo服务节点的创建中,采用了临时节点的创建来处理。

但是,对于dubbo服务中的providers/configuration/routes等信息会被作为持久化节点来创建。具体节点信息如下图所示。

三、dubbo服务的订阅

dubbo服务的订阅是建立在远程服务暴露的源码基础上:

代码语言:javascript复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    URL registryUrl = getRegistryUrl(originInvoker);
    //初始化注册信息、连接zk
代码语言:javascript复制
    final Registry registry = getRegistry(originInvoker);
    //注册服务 - 即在注册中心创建dubbo服务节点
    register(registryUrl, registeredProviderUrl);    //...    //订阅
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    return new DestroyableExporter<T>(exporter, originInvoker,                    overrideSubscribeUrl, registeredProviderUrl);}

订阅相关代码:

代码语言:javascript复制
public void subscribe(URL url, NotifyListener listener) {    //将订阅服务对象 加入到订阅队列中
    super.subscribe(url, listener);
    //移除订阅失败队列    removeFailedSubscribed(url, listener);
    try {        //...
        //向服务器发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {        //...
        //出现异常则添加到订阅失败队列中
        addFailedSubscribed(url, listener);
    }
}
代码语言:javascript复制
protected void doSubscribe(final URL url, final NotifyListener listener) {
        //...
} else {            //子结点数据集合
代码语言:javascript复制
            List<URL> urls = new ArrayList<URL>();
            //将url拆分对应的监听集合,如router/configuration/provider
            for (String path : toCategoriesPath(url)) {                //监听集合中获取
                ConcurrentMap<NotifyListener, ChildListener> listeners                            = zkListeners.get(url);
                //获取不到,则重新new一个,放到监听集合中                if (listeners == null) {
                zkListeners.putIfAbsent(url,                    new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }                //从集合中,获取监听订阅对象
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {                    //获取不到 创建一个监听
                listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                        //订阅配置信息 执行notify方法,主要用于收到订阅后的处理                          notify(url, listener,                            toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }                //创建一个订阅的节点 - 为持久化节点
                zkClient.create(path, false);
                //加入到监听队列中                List<String> children = zkClient.addChildListener(path,                                                 zkListener);                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }            //全量数据dubbo服务订阅处理
            notify(url, listener, urls);
代码语言:javascript复制
        }
   //...
}

加入到监听队列中的方法-主要用于收到订阅后的处理,如:删除节点、修改节点、添加子节点。

以上为dubbo服务的订阅,总结主要分为三个步骤:

(1)创建持久化dubbo配置节点,即/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。

(2)加入订阅/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。

(3)收到订阅后的处理。

四、收到订阅后的处理-notify

代码语言:javascript复制
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //...    //已经通知过的列表
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {        //对订阅的结点-configurationproviders等进行遍历
        String category = entry.getKey()        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //把服务端的注册url信息更新到本地dubbo缓存中
        saveProperties(url);
        listener.notify(categoryList);
    }
}

listener.notify的核心代码:

代码语言:javascript复制
public synchronized void notify(List<URL> urls) {    //分别存放三种类型的集合
List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category "   category   " in notified url: "   url   " from registry "   getUrl().getAddress()   " to consumer "   NetUtils.getLocalHost());
        }
    }
    //更新服务端configuration配置信息
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    //更新缓存的路由配置信息
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { 
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    //重建invoker实例
    refreshInvoker(invokerUrls);
}

notify方法主要是对订阅的服务端的configuration、routes配置信息进行更新,最后重新生成服务提供api的invoker实例,执行完毕。

0 人点赞