Dubbo系列笔记之服务引用过程,不服不行

2020-09-24 16:08:56 浏览数 (1)

写在前面:2020年面试必备的Java后端进阶面试题总结了一份复习指南在Github上,内容详细,图文并茂,有需要学习的朋友可以Star一下! GitHub地址:https://github.com/abel-max/Java-Study-Note/tree/master

一、引言

服务引用有 直连注册中心 两种方式,一般来说直连方式不推荐用于生产,仅提供测试或预发布的调试使用。所以本篇重点分析通过注册中心引用服务的过程。

二、服务引用的起点

Dubbo 服务引用的起点有两个,一般来说我们都是以 ReferenceBean 对应的服务注入形式使用,例如常用的注解形式 @DubboReference ( @Reference 注解在新版 Dubbo 中已废弃);另一种是 Spring 容器调用 ReferenceBean 的 #afterPropertiesSet 方法时引用服务,这种方式需要配置 <dubbo:reference> 的 init 属性开启,Dubbo 默认使用第一种方式。

我们从 ReferenceBean 入手:

Dubbo系列笔记之服务引用过程,不服不行

ReferenceBean 实现了 FactoryBean 和 InitializingBean :

  • InitializingBean 看一下 #afterPropertiesSet 方法:
代码语言:javascript复制
@Override
 @SuppressWarnings({"unchecked"})
 public void afterPropertiesSet() throws Exception {     // 初始化 Dubbo config bean
     prepareDubboConfigBeans();
     // lazy init by default.
     if (init == null) {
         init = false;
     }
     // eager init if necessary.
     if (shouldInit()) {
         getObject();
     }
  }
  • FactoryBean Spring 通过调用 #getBean 方法可以返回 bean 的实例,而实现了 FactoryBean 接口,会调用 #getObject 方法返回 bean 的实例。
代码语言:javascript复制
@Override
    public Object getObject() {
    // 调用 ReferenceConfig 的 get 方法获取 bean 实例
        return get();
    }

比较上面两个方法,我们可以知道 Spring 在实例化 Dubbo 的 ReferenceBean 时会调用 ReferenceConfig 的 #get 方法获取 bean 实例,执行 Dubbo 服务引用的过程。

三、配置检查处理

继续跟随 ReferenceConfig 的 #get 方法:

代码语言:javascript复制
public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig("   url   ") has already destroyed!");
        }        // 若服务引用代理为空,执行 init 方法
        if (ref == null) {
            // 处理配置,调用 createProxy 生成代理类
            init();
        }
        return ref;
    }
  • ReferenceConfig 的 init( ) 方法
代码语言:javascript复制
public synchronized void init() {
      // 标识是否已经初始化,避免重复初始化
      if (initialized) {
          return;
      }      // 获取 DubboBootstrap 引导类实例      if (bootstrap == null) {
          bootstrap = DubboBootstrap.getInstance();          bootstrap.init();      }      // 检查接口、consumer 等配置是否合法,并对相应的配置赋值      checkAndUpdateSubConfigs();      // 本地存根检查
      checkStubAndLocal(interfaceClass);      ConfigValidationUtils.checkMock(interfaceClass, this);      Map<String, String> map = new HashMap<String, String>();
      map.put(SIDE_KEY, CONSUMER_SIDE);      // 加入运行时参数,Dubbo 版本号、时间戳、进程号等
      ReferenceConfigBase.appendRuntimeParameters(map);
      // 是否为泛化接口
      if (!ProtocolUtils.isGeneric(generic)) {
          String revision = Version.getVersion(interfaceClass, version);          if (revision != null && revision.length() > 0) {
              map.put(REVISION_KEY, revision);          }          // 获取接口的方法列表,加入 map
          String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();          if (methods.length == 0) {
              logger.warn("No method found in service interface "   interfaceClass.getName());
              map.put(METHODS_KEY, ANY_VALUE);          } else {
              map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));          }      }      map.put(INTERFACE_KEY, interfaceName);      // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中
      AbstractConfig.appendParameters(map, getMetrics());
      AbstractConfig.appendParameters(map, getApplication());
      AbstractConfig.appendParameters(map, getModule());
      AbstractConfig.appendParameters(map, consumer);
      AbstractConfig.appendParameters(map, this);
      Map<String, AsyncMethodInfo> attributes = null;      if (CollectionUtils.isNotEmpty(getMethods())) {
          attributes = new HashMap<>();          for (MethodConfig methodConfig : getMethods()) {
              AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
              String retryKey = methodConfig.getName()   ".retry";
              if (map.containsKey(retryKey)) {
                  String retryValue = map.remove(retryKey);                  if ("false".equals(retryValue)) {
                      map.put(methodConfig.getName()   ".retries", "0");
                  }              }              AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);              if (asyncMethodInfo != null) {
                  attributes.put(methodConfig.getName(), asyncMethodInfo);              }          }      }      // 从系统变量中获取服务消费者 ip      String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);      if (StringUtils.isEmpty(hostToRegistry)) {
          hostToRegistry = NetUtils.getLocalHost();      } else if (isInvalidLocalHost(hostToRegistry)) {
          throw new IllegalArgumentException("Specified invalid registry ip from property:"   DUBBO_IP_TO_REGISTRY   ", value:"   hostToRegistry);
      }      map.put(REGISTER_IP_KEY, hostToRegistry);      // 存储配置数据
      serviceMetadata.getAttachments().putAll(map);
      // 创建代理
      ref = createProxy(map);
      serviceMetadata.setTarget(ref);
      serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
      ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());      consumerModel.setProxyObject(ref);
      consumerModel.init(attributes);      initialized = true;      // 发布 ReferenceConfigInitializedEvent 事件
      dispatch(new ReferenceConfigInitializedEvent(this, invoker));  }

代码较长,主要是各种配置的检查和初始化,并收集这些信息加入 map 存储,以及创建代理。

四、引用服务

接着上面我们继续看 #createProxy 方法,其不仅执行创建代理的逻辑,同时还会调用其他方法创建、合并 Invoker 实例。

代码语言:javascript复制
private T createProxy(Map<String, String> map) {
    // 判断是否本地暴露,包含指定服务 url 直连的情况判断、或根据参数配置是否进行本地暴露,如协议、scope、injvm 等
    if (shouldJvmRefer(map)) { // 本地引用
        // 创建 URL,协议为 njvm
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 调用 refer 方法创建 InjvmInvoker 实例
        invoker = REF_PROTOCOL.refer(interfaceClass, url);        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service "   interfaceClass.getName());
        }        // 远程引用
    } else {
        urls.clear();        // 若 url 不为空
        if (url != null && url.length() > 0) {
            // 配置多个 url 时,用分号分隔
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);                    if (StringUtils.isEmpty(url.getPath())) {
                        // 设置 url 路径为接口全限定名
                        url = url.setPath(interfaceName);                    }                    // 协议为 registry 时,指定注册中心
                    if (UrlUtils.isRegistry(url)) {
                        // 将 map 转换为查询字符串,赋值给 refer
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));                    } else {
                        // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                        // 最后将合并后的配置设置为 url 查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));                    }                }            }        } else { // 从注册中心的配置中组装 URL
            // 协议不是 injvm
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                checkRegistry();                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));                        }                        // 添加 refer 参数到 url,并加入 urls 集合
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));                    }                }                // 没有配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference "   interfaceName   " on the consumer "   NetUtils.getLocalHost()   " use dubbo version "   Version.getVersion()   ", please config <dubbo:registry address="..." /> to your spring config.");
                }            }        }        // 只有一个注册中心或者服务提供者
        if (urls.size() == 1) {
            // 构建 invoker 实例
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else { // 多个注册中心或多个服务提供者,或者两者混合
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));                if (UrlUtils.isRegistry(url)) {
                    // 最后一个注册中心 URL
                    registryURL = url;                }            }            if (registryURL != null) {
                // 如果注册中心链接不为空,则将使用 ZoneAwareCluster
                String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);                // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
                invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
            } else { // not a registry url, must be direct invoke.
                String cluster = CollectionUtils.isNotEmpty(invokers)                        ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                        : Cluster.DEFAULT;                invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
            }        }    }    if (shouldCheck() && !invoker.isAvailable()) {
        invoker.destroy();        throw new IllegalStateException("Failed to check the status of the service "
                  interfaceName                  ". No provider available for the service "
                  (group == null ? "" : group   "/")
                  interfaceName                  (version == null ? "" : ":"   version)
                  " from the url "
                  invoker.getUrl()                  " to the consumer "
                  NetUtils.getLocalHost()   " use dubbo version "   Version.getVersion());
    }    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service "   interfaceClass.getName()   " from url "   invoker.getUrl());
    }    // create service proxy
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

在 #createProxy 方法中,首先检查配置是否是本地暴露,如果是,则根据自适应扩展机制获取 InjvmProtocol,并调用 #refer 方法生成 InjvmInvoker 实例,完成服务引用。相反,则读取直连 url 配置,或读取注册中心 url ,并将其存储到 urls 集合中,根据 urls 的大小进行不同的处理。如果 urls 大小为 1,则直接根据自适应扩展调用调用 #refer 方法生成 invoker 。如果 urls 大于 1,则分别根据 url 生成 invoker,然后再通过 Cluster 合并多个 invoker ,最后调用 ProxyFactory 生成代理类。

五、创建 Invoker

讲到服务暴露时,我们同样分析了 Invoker 的创建过程。Invoker 作为 Dubbo 的通用模型,代表着一个可执行体。在服务提供者来看,Invoker 用于调用真实的服务实现类;而在服务消费者来看,Invoker 用于执行远程调用,在上面创建代理的方法中,我们注意到创建 Invoker 的一个关键方法 Protocol#refer(Class<T> type, URL url) 。

Protocol 的实现有很多,我们还是以常见的 DubboProtocol 和 RegistryProtocol 来分析 refer 方法如何构建 Invoker 。

1. DubboProtocol

DubboProtocol 继承了 AbstractProtocol 抽象类,从其 refer 方法入手:

代码语言:javascript复制
@Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

其调用了模板方法 #protocolBindingRefer(type, url) 。

回到 DubboProtocol#protocolBindingRefer(type, url) 方法:

代码语言:javascript复制
@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

上面代码非常简单,关注其中调用了 #getClients(url) 方法用于获取客户端实例,实例类型为 ExchangeClient。

ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。

对于 DubboProtocol 的引用逻辑我们先大概了解这么多,关于集群、通信后面会详细说。下面再看 RegistryProtocol 的 refer 方法:

代码语言:javascript复制
@Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {        // 通过参数获取 Registry 的协议,并将其设置为协议头
        url = getRegistryUrl(url);
        // 获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
        // url 查询字符串转换为 map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        // 获取 group 配置
        String group = qs.get(GROUP_KEY);
        // 多个 group
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
            }
        }
        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

继续看 #doRefer 方法:

代码语言:javascript复制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 实例        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心和协议        directory.setRegistry(registry);        directory.setProtocol(protocol);        // all attributes of REFER_KEY        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        // 创建服务消费者 URL        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);        if (directory.isShouldRegister()) {            directory.setRegisteredConsumerUrl(subscribeUrl);            registry.register(directory.getRegisteredConsumerUrl());        }        directory.buildRouterChain(subscribeUrl);        directory.subscribe(toSubscribeUrl(subscribeUrl));        // 将多个服务提供者合并        Invoker<T> invoker = cluster.join(directory);
        // 注册中心此时没有其他服务提供者        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {            return invoker;        }        // 多个服务提供者时,通过 Wrapper 包裹,并通知 RegistryProtocol 的监听器        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {            listener.onRefer(this, registryInvokerWrapper);        }        return registryInvokerWrapper;    }

如此,生成 Invoker 创建完毕,再根据服务接口生成代理对象,便可执行远程调用,生成代理的部分逻辑和上篇服务暴露的入口一致,即 ProxyFactory 的 getProxy 方法,感兴趣的小伙伴可自行查看。

来源:https://www.tuicool.com/articles/nYZJZzJ

0 人点赞