dubbo学习(七)服务引用-consumer

2020-11-06 08:12:53 浏览数 (1)

一、服务引用

在消费端dubbo的使用中我们会把需要引用的provider服务配置在dubbo-consumer.xml中来进行引用,这一篇主要来分析dubbo的provider服务在消费端的服务引用。

在消费端的应用中的服务引用配置:

代码语言:javascript复制
<dubbo:reference id="testApi" interface="com.ywl.dubbo.TestApi"
                 check="false" timeout="3000"/>

· 服务引用配置的解析

和dubbo-provider服务api配置类似,也会依赖dubbo.schema的自定义标签,这里使用了reference元素,那么该服务引用配置的解析就由以下代码来完成:

registerBeanDefinitionParser("reference",

new DubboBeanDefinitionParser(ReferenceBean.class, false));

代码语言:javascript复制

· ReferenceBean

public class ReferenceBean<T> extends ReferenceConfig<T>

implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

该服务引用实现了FactoryBean接口,那么意味着该服务引用可以通过getObject方法来获取该bean的实例。

二、服务引用原理-refer

1、通过getObject获取服务实例

代码语言:javascript复制
public Object getObject() throws Exception {
    return get();
}

2、开始创建服务代理类-createProxy

代码语言:javascript复制
private T createProxy(Map<String, String> map) {
    //...
    if (isJvmRefer) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service "   interfaceClass.getName());
        }
    } else {
        //...
        if (urls.size() == 1) {            //获取服务的invoker对象
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        }        //...
    }
    //将invoker对象 生成为动态代理类
    return (T) proxyFactory.getProxy(invoker);
}

3、客户端引用远程服务-refer

代码语言:javascript复制
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);      //建立zookeeper连接    Registry registry = registryFactory.getRegistry(url);
    //如果是RegistryService类型,则直接返回RegistryService的invoker对象
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    //...
    return doRefer(cluster, registry, type, url);
}
代码语言:javascript复制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {        //向注册中心注册消费者信息-持久化节点
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }    //订阅服务提供方
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                      ","   Constants.CONFIGURATORS_CATEGORY
                      ","   Constants.ROUTERS_CATEGORY));
    //路由选择    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);    return invoker;
}

如上所示,客户端引用远程服务主要干了这几件事:

(1)注册消费者信息-持久化结点,具体的结点为/dubbo/com.ywl.dubbo.TestApi/consumers。

(2)订阅服务提供方结点,providers,routes,configurations。

(3)路由选择

· 订阅服务提供方

收到订阅后的处理相关的核心代码:

代码语言:javascript复制
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //...
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {        //category 为 providers,configurators,routersString category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //将消费者注册的url信息缓存到本地文件中        saveProperties(url);
        listener.notify(categoryList);
    }
}
代码语言:javascript复制
public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
代码语言:javascript复制
    //...
代码语言:javascript复制
    //更新configurations信息
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    //更新routes信息
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
代码语言:javascript复制
    //更新invoker信息
代码语言:javascript复制
    refreshInvoker(invokerUrls);
}

refreshInvoker方法的主要目的是将invokerURL列表转换为invoker列表。

dubbo的refreshInvoker方法中的注释中标注了以下的转换规则:

(1)如果url已经被转换为invoker,则不再重新引用,则直接从缓存中获取,如果url中任意一个参数变更也会重新引用。

(2)如果传入的invoker列表不为空,则表示最新的invoker列表。

(3)如果传入的invoker列表为空,则表示只是下发的ovverride规则或route规则,需要重新交叉对比,决定是否需要重新引用。

· 路由选择-cluster.join

代码语言:javascript复制
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

join方法主要是初始化对应的路由信息,因为在配置文件中未配置所以进入了默认的路由规则实现 - FailoverCluster,在后续dubbo调用中的负载均衡策略需要,dubbo的路由策略会单独写一篇文章来分析。

dubbo路由规则实现:

(1)Failback Cluster

表示失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

服务提供方配置:<dubbo:service cluster="failback" />

服务消费方配置:<dubbo:reference cluster="failback" />

(2)Failover Cluster

表示失败自动切换,当出现失败会重试其他服务器。可人为设置重试次数,通常可以用于读操作,但是如果发生重试会延长接口的执行时间。如果retries设置为0时和failfast规则一样。

服务提供方配置:<dubbo:service cluster="failover" retries="2" />

服务消费方配置:<dubbo:reference cluster="failover" retries="2" />

或者不配置,则默认为该实现。

(3)Failfast Cluster

表示快速失败,只会发起一次调用,失败则立即报错。通常用于非幂等性的写操作,比如新增记录。

服务提供方配置:<dubbo:service cluster="failfast" />

服务消费方配置:<dubbo:reference cluster="failfast" />

(4)Failsafe Cluster

表示失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

服务提供方配置:<dubbo:service cluster="failsafe" />

服务消费方配置:<dubbo:reference cluster="failsafe" />

(5)forking Cluster

并行调用服务器,只要一个成功即返回。由于这样需要浪费更多服务资源,所以通常用于实时性较高的读操作。

服务提供方配置:<dubbo:service cluster="forking" />

服务消费方配置:<dubbo:reference cluster="forking" />

(6)mergeable Cluster

聚合集群,将集群中的调用结果聚合起来返回聚合后的结果。通常用于接口一样,但有多种实现,用group区分的接口,比如菜单服务。

服务消费方配置:<dubbo:reference group="*" merger="true" /> 或

<dubbo:reference group="aaa,bbb" merger="true" />等

(7)Broad Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

服务提供方配置:<dubbo:service cluster="broadcast" />

服务消费方配置:<dubbo:reference cluster="broadcast" />

以上的路由策略,在一般情况下采用failover(读操作)或failfast(写操作)规则就可以满足需求。

4、生成代理对象 - ProxyFactory.getProxy(invoker)

return (T) Proxy.newProxyInstance(

Thread.currentThread().getContextClassLoader(), interfaces,

new InvokerInvocationHandler(invoker));

代码语言:javascript复制

最终采用了JDK动态代理生成代理对象。

三、总结

0 人点赞