dubbo源码学习之服务发布调用链 消费者消费调用链(三)我太难了。。

2022-10-25 16:35:29 浏览数 (1)

源码阅读之dubbo服务发布过程分析

dubbo 是基于 spring 配置来实现服务的发布的,那么一定是基于 spring的扩展来写了一套自己的标签,那么 spring 是如何解析这些配置呢?总的来说,就是可以通过 spring 的扩展机制来扩展自己的标签。大家在dubbo 配置文件中看到的dubbo:service ,就是属于自定义扩展标签

dubbo 配置文件中看到的dubbo:service ,就是属于自定义扩展标签要实现自定义扩展,有三个步骤(在 spring 中定义了两个接口,用来实现扩展)

  1. NamespaceHandler: 注册一堆 BeanDefinitionParser,利用他们来进 行解析
  2. BeanDefinitionParser:用于解析每个 element 的内容
  3. Spring 默认会加载 jar 包下的 META-INF/spring.handlers 文件寻找对 应的 NamespaceHandler。 以下是 Dubbo-config 模块下的 dubbo-config-spring

Dubbo 的接入实现 Dubbo 中 spring 扩展就是使用 spring 的自定义类型,所以同样也有NamespaceHandler、BeanDefinitionParser。而 NamespaceHandler 是DubboNamespaceHandler

代码语言:javascript复制
public class DubboNamespaceHandler extends NamespaceHandlerSupport {

	static {
		Version.checkDuplicate(DubboNamespaceHandler.class);
	}

	public void init() {
	    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我们向看dubbo:service的配置,就直接看 DubboBeanDefinitionParser中 这个里面主要做了一件事,把不同的配置分别转化成spring容器中的bean对象 application 对应 ApplicationConfig registry 对应 RegistryConfig monitor 对应 MonitorConfig provider 对应 ProviderConfig consumer 对应 ConsumerConfig … 为了在 spring 启动的时候,也相应的启动 provider 发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean ServiceBean、ReferenceBean。 分别继承了 ServiceConfig 和 ReferenceConfig 同 时 还 分 别 实 现 了 InitializingBean 、 DisposableBean,ApplicationContextAware, ApplicationListener, BeanNameAware InitializingBean 接口为 bean 提供了初始化方法的方式,它只包括afterPropertiesSet 方法,凡是继承该接口的类,在初始化 bean 的时候会执行该方法。 DisposableBean bean 被销毁的时候,spring 容器会自动执行 destory 方法,比如释放资源 ApplicationContextAware 实现了这个接口的 bean,当 spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来 ApplicationListener ApplicationEvent 事件监听,spring 容器启动后会发一个事件通知 BeanNameAware 获得自身初始化时,本身的 bean 的 id 属性 那么基本的实现思路可以整理出来了

  1. 利用 spring 的解析收集 xml 中的配置信息,然后把这些配置信息存储 到 serviceConfig 中
  2. 调用 ServiceConfig 的 export 方法来进行服务的发布和注册

serviceBean 是服务发布的切入点,通过 afterPropertiesSet 方法,调用export()方法进行发布。 export 为父类 ServiceConfig 中的方法,所以跳转到 SeviceConfig 类中的export 方法

代码语言:javascript复制
public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && ! export.booleanValue()) {
            return;
        }
        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();
        }
    }

我们发现,delay 的作用就是延迟暴露,而延迟的方式也很直截了当,Thread.sleep(delay)

  1. export 是 synchronized 修饰的方法。也就是说暴露的过程是原子操作,正常情况下不会出现锁竞争的问题,毕竟初始化过程大多数情况下都是单一线程操作,这里联想到了 spring 的初始化流程,也进行了加锁操作,这里也给我们平时设计一个不错的启示:初始化流程的性能调优优先级应该放的比较低,但是安全的优先级应该放的比较高!
  2. 继续看 doExport()方法。同样是一堆初始化代码

export 的过程 继续看 doExport(),最终会调用到 doExportUrls()中:

代码语言:javascript复制
	@SuppressWarnings({ "unchecked", "rawtypes" })
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);//是不是获得注册中心的配置
        for (ProtocolConfig protocolConfig : protocols) { //是不是支持多协议发布
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

这 个 protocols 长 这 个 样 子 <dubbo:protocol name=“dubbo” port=“20888” id=“dubbo” /> protocols 也是根据配置装配出来的。接下来让我们进入 doExportUrlsFor1Protocol 方法看看 dubbo 具体是怎么样将服务暴露出去的

代码语言:javascript复制
if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    //拿到所有url 循环
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service "   interfaceClass.getName()   " url "   url   " to registry "   registryURL);
                        }
                        //通过proxyFactory来获取Invoker对象
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        //注册服务
                        Exporter<?> exporter = protocol.export(invoker);
                        //将exporter添加到list中
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }

这个URL很眼熟,没错在注册中心看到的services的providers信息就是这个在上面这段代码中可以看到 Dubbo 的比较核心的抽象:Invoker, Invoker是一个代理类,从 ProxyFactory 中生成。 这个地方可以做一个小结

  1. Invoker - 执行具体的远程调用(这块后续单独讲)
  2. Protocol – 服务地址的发布和订阅
  3. Exporter – 暴露服务或取消暴露

protocol.export(invoker) protocol 这个地方,其实并不是直接调用 DubboProtocol 协议的 export,protocol 这个属性是在哪里实例化的?以及实例化的代码 是什么?

代码语言:javascript复制
//Protocol$Adaptive.java
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

实际上这个 Protocol 得到的应该是一个 Protocol

Adaptive。一个自适应的适配器。这个时候,通过 protocol.export(invoker),实际上调用的应该是Protocol

Adaptive 这个动态类的 export 方法。我们看看这段代码

代码语言:javascript复制
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("   url.toString()   ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 这 段 代 码 中 ,ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 当 extName 为 registry 的时候,我们不需要再次去阅读这块代码了,直接可以在扩展点中找到相应的实现扩展点[/dubbo-registry-api/src/main/resources/META- INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol] 配置如下

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol 所以,我们可以定位到 RegistryProtocolRegistryProtocol 好这个类中的export 方法

代码语言:javascript复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
            	try {
            		exporter.unexport();
            	} catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	overrideListeners.remove(overrideSubscribeUrl);
                	registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
            }
        };
    }

doLocalExport 本地先启动监听服务

代码语言:javascript复制
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return (ExporterChangeableWrapper<T>) exporter;
}

通过 Wrapper对 Protocol 进 行 装 饰 , 装 饰 器 分 别 为 : ProtocolFilterWrapper/ProtocolListenerWrapper;

ProtocolFilterWrapper 这个类非常重要,dubbo 机制里面日志记录、超时等等功能都是在这一部分实现的 这个类有 3 个特点, 第一它有一个参数为 Protocol protocol 的构造函数; 第二,它实现了 Protocol 接口; 第三,它使用责任链模式,对 export 和 refer 函数进行了封装;部分代码 如下

代码语言:javascript复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

DubboProtocol类的export

代码语言:javascript复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    
    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    
    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer ["  url.getParameter(Constants.INTERFACE_KEY)  
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    //暴露服务
    openServer(url);
    
    return exporter;
}

dubboProtocol 的 export 方法:openServer(url)

代码语言:javascript复制
private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
       ExchangeServer server = serverMap.get(key);
       if (server == null) {
           //没有的话就创建服务
          serverMap.put(key, createServer(url));
       } else {
          //server支持reset,配合override功能使用
          server.reset(url);
       }
    }
}

createServer

代码语言:javascript复制
private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: "   str   ", url: "   url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //绑定服务
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: "   url   ") "   e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: "   str);
            }
        }
        return server;
    }

Exchangers. bind()

代码语言:javascript复制
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
}

GETEXCHANGER 通过 ExtensionLoader 获得指定的扩展点,type 默认为 header

代码语言:javascript复制
public static Exchanger getExchanger(URL url) {
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

HeaderExchanger.bind

代码语言:javascript复制
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

Transporters.bind

代码语言:javascript复制
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
代码语言:javascript复制
public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        //心跳
        startHeatbeatTimer();
    }

private void startHeatbeatTimer() {
    //关闭心跳定时
    stopHeartbeatTimer();
    if (heartbeat > 0) {
        //每隔heartbeat时间执行一次
        heatbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                    public Collection<Channel> getChannels() {
                        return Collections.unmodifiableCollection(
                                HeaderExchangeServer.this.getChannels() );
                    }
                }, heartbeat, heartbeatTimeout),
                heartbeat, heartbeat,TimeUnit.MILLISECONDS);
    }
}

private void stopHeartbeatTimer() {
    try {
        ScheduledFuture<?> timer = heatbeatTimer;
        if (timer != null && ! timer.isCancelled()) {
            timer.cancel(true);
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    } finally {
        heatbeatTimer =null;
    }
}

心跳线程 HeartBeatTask 在超时时间之内,发送数据 在超时时间在外,是客户端的话,重连;是服务端,那么关闭 服务发布总结 直接从官方网站上扒了一个图过来,,好这个图显示的很清楚了。

服务注册的过程

前面,我们已经知道,基于 spring 这个解析入口,到发布服务的过程,接着基于 DubboProtocol 去发布,最终调用 Netty 的 api 创建了一个NettyServer。

RegistryProtocol

代码语言:javascript复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider -----------------------这里
    final Registry registry = getRegistry(originInvoker);
    //得到需要注册到zk上的协议地址 也就是dubbo://
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //注册 这里=========================================
    registry.register(registedProviderUrl);
    // 订阅override数据
    // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
           try {
              exporter.unexport();
           } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               overrideListeners.remove(overrideSubscribeUrl);
               registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
        }
    };
}

final Registry registry = getRegistry(originInvoker);

getRegistry

代码语言:javascript复制
/**
 * 根据invoker的地址获取registry实例
 * @param originInvoker
 * @return
 */
private Registry getRegistry(final Invoker<?> originInvoker){
    获得registry://192.168.11.156:2181 的协议地址
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        //得到 zookeeper 的协议地址
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        //registryUrl 就会变成了 zookeeper://192.168.11.156
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryFactory.getRegistry(registryUrl);
}

registryFactory.getRegistry 这段代码很明显了,通过前面这段代码的分析,其实就是把 registry 的协议头改成服务提供者配置的协议地址,也就是我们配置的 <dubbo:registry address=”zookeeper://192.168.11.156:2181”/>然后 registryFactory.getRegistry 的目的,就是通过协议地址匹配到对应的注册中心。那 registryFactory 是一个什么样的对象呢?,我们找一下这个代码的定义

代码语言:javascript复制
private RegistryFactory registryFactory;

public void setRegistryFactory(RegistryFactory registryFactory) {
    this.registryFactory = registryFactory;
}

RegistryFactory是一个扩展点

代码语言:javascript复制
@SPI("dubbo")
public interface RegistryFactory {

    /**
     * 连接注册中心.
     * 
     * 连接注册中心需处理契约:<br>
     * 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
     * 2. 支持URL上的username:password权限认证。<br>
     * 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
     * 4. 支持file=registry.cache本地磁盘文件缓存。<br>
     * 5. 支持timeout=1000请求超时设置。<br>
     * 6. 支持session=60000会话超时或过期设置。<br>
     * 
     * @param url 注册中心地址,不允许为空
     * @return 注册中心引用,总不返回空
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

RegistryFactory$Adaptive 我们拿到这个动态生成的自适应扩展点,看看这段代码里面的实现

  1. 从 url 中拿到协议头信息,这个时候的协议头是 zookeeper://
  2. 通 过ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(“zookeeper”)去获得一个指定的扩展点,而这个扩展点的配置在dubbo-registry-zookeeper/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory。得 到一个 ZookeeperRegistryFactory

ZookeeperRegistryFactory

代码语言:javascript复制
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
   
   private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
      this.zookeeperTransporter = zookeeperTransporter;
   }

   public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

这个方法中并没有 getRegistry 方法,而是在父类 AbstractRegistryFactory

  1. 从缓存 REGISTRIES 中,根据 key 获得对应的 Registry
  2. 如果不存在,则创建 Registry

AbstractRegistryFactory

代码语言:javascript复制
public Registry getRegistry(URL url) {
   url = url.setPath(RegistryService.class.getName())
         .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
         .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
   String key = url.toServiceString();
    // 锁定注册中心获取过程,保证注册中心单一实例
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //这里======================
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry "   url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 释放锁
        LOCK.unlock();
    }
}

createRegistry

创建一个注册中心,这个是一个抽象方法,具体的实现在对应的子类实例中实现的,在 ZookeeperRegistryFactory 中

代码语言:javascript复制
public Registry createRegistry(URL url) {
       return new ZookeeperRegistry(url, zookeeperTransporter);
   }

getRegistry 得出了一个结论,根据当前注册中心的配置信息,获得一个匹配的注册中心,也就是 ZookeeperRegistry

registry.register(registedProviderUrl);

调用 registry.register 去讲 dubbo://的协议地址注册到zookeeper 上 这个方法会调用 FailbackRegistry 类中的 register. 为什么呢?因为ZookeeperRegistry 这个类中并没有 register 这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的 register 方法。所以我们可以直接定位大 FailbackRegistry 这个类中的 register 方法中 FailbackRegistry.register

  1. FailbackRegistry,从名字上来看,是一个失败重试机制
  2. 调用父类的 register 方法,讲当前 url 添加到缓存集合中
  3. 调用 doRegister 方法,这个方法很明显,是一个抽象方法,会由 ZookeeperRegistry 子类实现。

FailbackRegistry.register

代码语言:javascript复制
@Override
public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服务器端发送注册请求 这里==================
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 如果开启了启动时检测,则直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if(skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register "   url   " to registry "   getUrl().getAddress()   ", cause: "   t.getMessage(), t);
        } else {
            logger.error("Failed to register "   url   ", waiting for retry, cause: "   t.getMessage(), t);
        }

        // 将失败的注册请求记录到失败列表,定时重试
        failedRegistered.add(url);
    }
}

ZookeeperRegistry.doRegister

代码语言:javascript复制
protected void doRegister(URL url) {
    try {
        //创建dubbo url
       zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register "   url   " to zookeeper "   getUrl()   ", cause: "   e.getMessage(), e);
    }
}

终于找到你了,调用 zkclient.create 在 zookeeper 中创建一个节点。

RegistryProtocol.export 这个方法中后续的代码就不用再分析了。就是去对服务提供端去注册一个 zookeeper 监听,当监听发生变化的时候,服务端做相应的处理。

启动一个nettyserver

获得注册中心 在zookeeper上注册协议地址

源码分析之消费端初始化流程

代码语言:javascript复制
ReferenceBean(afterPropertiesSet) ->getObject() ->get()->init()->createProxy  最终会获得一个代理对象。

ReferenceConfig的createProxy

代码语言:javascript复制
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
private T createProxy(Map<String, String> map) {
   URL tmpUrl = new URL("temp", "localhost", 0, map);
   final boolean isJvmRefer;
       if (isInjvm() == null) {
           if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
               isJvmRefer = false;
           } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
               //默认情况下如果本地有服务暴露,则引用本地服务.
               isJvmRefer = true;
           } else {
               isJvmRefer = false;
           }
       } else {
           isJvmRefer = isInjvm().booleanValue();
       }
   
   if (isJvmRefer) {
      URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
      invoker = refprotocol.refer(interfaceClass, url);
           if (logger.isInfoEnabled()) {
               logger.info("Using injvm service "   interfaceClass.getName());
           }
   } else {
           if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
               String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
               if (us != null && us.length > 0) {
                   for (String u : us) {
                       URL url = URL.valueOf(u);
                       if (url.getPath() == null || url.getPath().length() == 0) {
                           url = url.setPath(interfaceName);
                       }
                       if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                           urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                       } else {
                           urls.add(ClusterUtils.mergeUrl(url, map));
                       }
                   }
               }
           } else { // 通过注册中心配置拼装URL
               List<URL> us = loadRegistries(false);
               if (us != null && us.size() > 0) {
                   for (URL u : us) {
                       URL monitorUrl = loadMonitor(u);
                       if (monitorUrl != null) {
                           map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                       }
                       urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                   }
               }
               if (urls == null || urls.size() == 0) {
                   throw new IllegalStateException("No such any registry to reference "   interfaceName    " on the consumer "   NetUtils.getLocalHost()   " use dubbo version "   Version.getVersion()   ", please config <dubbo:registry address="..." /> to your spring config.");
               }
           }

           if (urls.size() == 1) {
               //获得invoke代理对象 ===============这里
               invoker = refprotocol.refer(interfaceClass, urls.get(0));
           } else {
               List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
               URL registryURL = null;
               for (URL url : urls) {
                   invokers.add(refprotocol.refer(interfaceClass, url));
                   if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                       registryURL = url; // 用了最后一个registry url
                   }
               }
               if (registryURL != null) { // 有 注册中心协议的URL
                   // 对有注册中心的Cluster 只用 AvailableCluster
                   URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                   invoker = cluster.join(new StaticDirectory(u, invokers));
               }  else { // 不是 注册中心的URL
                   invoker = cluster.join(new StaticDirectory(invokers));
               }
           }
       }

       Boolean c = check;
       if (c == null && consumer != null) {
           c = consumer.isCheck();
       }
       if (c == null) {
           c = true; // default true
       }
       if (c && ! invoker.isAvailable()) {
           throw new IllegalStateException("Failed to check the status of the service "   interfaceName   ". No provider available for the service "   (group == null ? "" : group   "/")   interfaceName   (version == null ? "" : ":"   version)   " from the url "   invoker.getUrl()   " to the consumer "   NetUtils.getLocalHost()   " use dubbo version "   Version.getVersion());
       }
       if (logger.isInfoEnabled()) {
           logger.info("Refer dubbo service "   interfaceClass.getName()   " from url "   invoker.getUrl());
       }
       // 创建服务代理
       return (T) proxyFactory.getProxy(invoker);
   }

refprotocol.refer(interfaceClass, urls.get(0));

refprotocol这个对象,定义的代码如下,是一个自适应扩展点,得到的是Protocol$Adaptive

代码语言:javascript复制
 public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("   url.toString()   ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

这段代码中,根据当前的协议url,得到一个指定的扩展点,传递进来的参数中,协议地址为registry://,所以,我们可以直接定位到RegistryProtocol.refer代码

代码语言:javascript复制
	@SuppressWarnings("unchecked")
	public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //url registry://192.168.2.11
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //获得注册中心 zookeeperRegister
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
        	return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        //cluster=Cluster$Adaptive
        return doRefer(cluster, registry, type, url);
    }

doRefer方法中有一个参数是cluster,我们找到它的定义代码如下,。又是一个自动注入的扩展点。

代码语言:javascript复制
从下面的代码可以看出,这个不仅仅是一个扩展点,而且方法层面上,还有一个@Adaptive,表示会动态生成一个自适应适配器Cluster$Adaptive
代码语言:javascript复制
通过debug的方式,,获取到Cluster$Adaptive这个适配器,代码如下。我们知道cluster这个对象的实例以后,继续看doRefer方法;
注意:这里的Cluster$Adaptive也并不单纯,大家还记得在讲扩展点的时候有一个扩展点装饰器吗?如果这个扩展点存在一个构造函数,并且构造函数就是扩展接口本身,那么这个扩展点就会这个wrapper装饰,而Cluster被装饰的是:MockClusterWrapper
代码语言:javascript复制
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url("   url.toString()   ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}
代码语言:javascript复制
这段代码中,有一个RegistryDirectory,可能看不懂,我们暂时先忽略,等会单独讲.(基于注册中心动态发现服务提供者)
1.    将consumer://协议地址注册到注册中心
2.   订阅zookeeper地址的变化
3.   调用cluster.join()方法
代码语言:javascript复制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
              ","   Constants.CONFIGURATORS_CATEGORY 
              ","   Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

cluster.join

由前面的Cluster$Adaptive这个类中的join方法的分析,得知cluster.join会调用MockClusterWrapper.join方法, 然后再调用FailoverCluster.join方法。

MockClusterWrapper.join

这个意思很明显了。也就是我们上节课讲过的mock容错机制,如果出现异常情况,会调用MockClusterInvoker,否则,调用FailoverClusterInvoker.

代码语言:javascript复制
public class MockClusterWrapper implements Cluster {

   private Cluster cluster;

   public MockClusterWrapper(Cluster cluster) {
      this.cluster = cluster;
   }

   public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
      return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
   }

}

小结

refprotocol.ref,这个方法,会返回一个MockClusterInvoker(FailoverClusterInvoker)。这里面一定还有疑问,我们先把主线走完,再回过头看看什么是cluster、什么是directory

proxyFactory.getProxy(invoker);

再回到ReferenceConfig这个类,在createProxy方法的最后一行,调用proxyFactory.getProxy(invoker). 把前面生成的invoker对象作为参数,再通过proxyFactory工厂去获得一个代理对象。接下来我们分析下这段代码做了什么。

其实前面在分析服务发布的时候,基本分析过了,所以再看这段代码,应该会很熟悉

ProxyFactory, 会生成一个动态的自适应适配器。ProxyFactory$Adaptive,然后调用这个适配器中的getProxy方法,代码如下

代码语言:javascript复制
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("   url.toString()   ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }

很显然,又是通过javassist实现的一个动态代理,我们来看看JavassistProxyFactory.getProxy

代码语言:javascript复制
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

在Proxy.getProxy这个类的如下代码中添加断点,在debug下可以看到动态字节码如下

代码语言:javascript复制
public java.lang.String sayHello(java.lang.String arg0){
  Object[] args = new Object[1]; 
  args[0] = ($w)$1; 
  Object ret = handler.invoke(this, methods[0], args); 
return (java.lang.String)ret;
}

handler在JavassistProxyFactory.getProxy中。传递的new InvokerInvocationHandler(invoker)

消费端调用过程流程图

消费端的调用过程

客户端什么时候建立和服务端的连接

前面我们通过代码分析到了,消费端的初始化过程,但是似乎没有看到客户端和服务端建立NIO连接。实际上,建立连接的过程在消费端初始化的时候就建立好的,只是前面我们没有分析,代码在RegistryProtocol.doRefer方法内的directory.subscribe方法中。

代码语言:javascript复制
public void subscribe(URL url) {
    setConsumerUrl(url);
    //registry=ZookeeperRegistry consumer://  this:RegistryDirectory
    registry.subscribe(url, this);
}

FailbackRegistry. subscribe

代码语言:javascript复制
	@Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 向服务器端发送订阅请求
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && urls.size() > 0) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe "   url   ", Using cached list: "   urls   " from cache file: "   getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home")   "/dubbo-registry-"   url.getHost()   ".cache")   ", cause: "   t.getMessage(), t);
            } else {
                // 如果开启了启动时检测,则直接抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe "   url   ", cause: "   t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe "   url   ", waiting for retry, cause: "   t.getMessage(), t);
                }
            }

            // 将失败的订阅请求记录到失败列表,定时重试
            addFailedSubscribed(url, listener);
        }
    }

zookeeperRegistry. doSubscribe

调用zookeeperRegistry执行真正的订阅操作,这段代码太长,我就不贴出来了,这里面主要做两个操作

  1. 对providers/routers/configurator三个节点进行创建和监听
  2. 调用notify(url,listener,urls) 将已经可用的列表进行通知
代码语言:javascript复制
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
            child = URL.decode(child);
                            if (! anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && services.size() > 0) {
                for (String service : services) {
      service = URL.decode(service);
      anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<URL>();
            //循环地址
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                           ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                //watch
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                   urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            //这里=============
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe "   url   " to zookeeper "   getUrl()   ", cause: "   e.getMessage(), e);
    }
}

AbstractRegistry.notify

代码语言:javascript复制
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.size() == 0) 
            && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url "   url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url "   url   ", urls: "   urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
           String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
           List<URL> categoryList = result.get(category);
           if (categoryList == null) {
              categoryList = new ArrayList<URL>();
              result.put(category, categoryList);
           }
           categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    //对provider、configuration、routers路径下进行notify 拿到地址缓存起来
    //第一次调用 后续的watch机制通知
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        listener.notify(categoryList);
    }
}

RegistryDirectory:

  1. 整合多个Invoker
  2. 监听注册中心的变化 刷新本地的List

0 人点赞