dubbo 源码 v2.7 分析:核心机制(二)

2021-03-03 09:35:07 浏览数 (1)

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制

dubbo 源码 v2.7 分析:核心机制(一)

关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~

一 概述

上一篇重点讲了dubbo中的几种设计模式,和对应的源码。本篇会继续介绍Bean加载、Extension、代理几种机制在dubbo中的应用。

二 Bean加载

dubbo中,bean的加载是基于Spring的可扩展Schema机制。

2.1 Spring的可扩展Schema

在大多数场景,如果我们需要为系统提供可配置化支持,简单的做法是直接基于 Spring的标准 Bean 来配置;但如果配置较为复杂或者需要更多丰富控制,这种简单粗暴的方法会显得非常笨拙。一般的做法会用原生态的方式去解析定义好的 xml 文件,然后转化为配置对象,这种方式当然可以解决所有问题,但实现起来比较繁琐,特别是是在配置非常复杂的时候,解析工作是一个不得不考虑的负担。Spring 提供了可扩展 Schema,配置步骤如下:

2.1.1 设计配置属性和JavaBean

2.1.2 编写XSD文件

Spring用xsd文件校验xml文件格式。校验方法:Spring默认在启动时是要加载XSD文件来验证xml文件的,所以如果有的时候断网了,或者一些开源软件切换域名,那么就很容易碰到应用启动不了。为了防止这种情况,Spring提供了一种机制,默认从本地加载XSD文件。

2.1.3 编写NamespaceHandler和BeanDefinitionParser

完成解析工作,会用到NamespaceHandler 和 BeanDefinitionParser。NamespaceHandler 会根据 schema 和节点名找到某个 BeanDefinitionParser,然后由BeanDefinitionParser 完成具体的解析工作。因此需要分别完成 NamespaceHandler 和BeanDefinitionParser 的实现类,Spring 提供了默认实现类 NamespaceHandlerSupport 和AbstractSingleBeanDefinitionParser,简单的方式就是去继承这两个类。

2.1.4 编写spring.handlers和spring.schemas

放在META-INF文件夹,串联所有部件,让应用感知到。这两个文件需要开发者编写并放入 META-INF 文件夹中,这两个文件的地址必须是 META-INF/spring.handlers 和 META-INF/spring.schemas,spring 会默认去载入这两个文件。

2.1.5 在Bean文件中应用

与配置一个普通的Spring Bean类似,只是需要注意使用的是自定义的Schema。配置好后,再通过Spring容器的getBean(beanName)方法来使用自定义的bean。

2.2 dubbo中的实现方式

在dubbo的META-INF下,可以看到包含了很多文件,其中spring.handlers和spring.schemas就是上面提到的两个串联配置文件:

spring.handles:

代码语言:javascript复制
http://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

spring.schemas:

代码语言:javascript复制
http://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd

三 Extension机制

即扩展点机制。

3.1 扩展点配置

3.1.1 根据关键字读取配置文件,获得具体的实现类

dubbo-demo/dubbo-demo-xml是dubbo源码的xml示例,其中dubbo-provider.xml的配置:

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-provider" metadata-type="remote">
        <dubbo:parameter key="mapping-type" value="metadata"/>
    </dubbo:application>

    <dubbo:config-center address="zookeeper://127.0.0.1:2181"/>
    <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
    <dubbo:registry id="registry1" address="zookeeper://127.0.0.1:2181"/>

    <dubbo:protocol name="dubbo" port="-1"/>

    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    <bean id="greetingService" class="org.apache.dubbo.demo.provider.GreetingServiceImpl"/>

    <dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/>
    <dubbo:service version="1.0.0" group="greeting" timeout="5000" interface="org.apache.dubbo.demo.GreetingService"
                   ref="greetingService"/>

</beans>

其中,<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/> 这段配置就会根据registry去获取指定的Service。

3.1.2 注解@SPI和@Adaptive

@SPI注解:

在Protocol接口中,定义默认协议为dubbo,就是通过@SPI("dubbo")来实现的:

代码语言:javascript复制
@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

@Adaptive注解:

这个注解打在接口方法上,通过ExtensionLoader.getAdaptiveExtension()方法获取设配类,会先通过前面的过程生成Java的源代码,再通过编译器编译成class加载。但Compiler的实现策略选择,也是通过ExtensionLoader.getAdaptiveExtension(),如果也通过编译器编译成class会导致死循环?

再分析一下ExtensionLoader.getAdaptiveExtension(),对于实现类上标记了注解@Adaptive的dubbo spi扩展机制,它获取设配类不是生成设配类的Java源代码,而是在读取扩展文件的时候,遇到实现类打了@Adaptive就把这个类作为设配类缓存在ExtensionLoader中,调用时直接返回。

3.1.3 filter和listener

在生成具体的实现类对象时,不是直接读取类文件,而是在读取类文件的基础上,通过filter和listener去封装类对象。

四 代理

4.1 代理生成方式

大家熟知的应该有JDK、cglib这两种方式,在Spring框架中做了这两种动态代理的支持。不过除此之外,Java中海油Javassist库动态代理、Javassist库动态字节码代理。代理之间的区别可参见Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)。

4.2 dubbo中的代理

在dubbo的rpc->proxy包下,我们可以看到javassist、jdk、wrapper这三个包。

其中几个重点类,JdkProxyFactory,JavassistProxyFactory这两个代理工厂类是继承AbstractProxyFactory抽象类,而AbstractProxyFactory这个抽象类又实现了ProxyFactory接口,StubProxyFactoryWrapper则是对ProxyFactory的直接实现。类之间关系如下:

4.3 Invoke调用

先上一张图:

ReferenceConfig类比较重要的是init()方法,在这里做了大量工作。我们关注的是ref = createProxy(map);再继续向下跟进,这个方法会创建一个service proxy并返回:

代码语言:javascript复制
return (T) PROXY_FACTORY.getProxy(invoker);

这里的PROXY_FACTORY也是通过扩展点机制获取的:

代码语言:javascript复制
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

关于ProxyFactory的配置,会涉及到META-INF/dubbo.internal/org.apache.dubbo.rpc.ProxyFactory这个文件,内容为:

代码语言:javascript复制
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory

这里指定了我们前面提到的三种ProxyFactory实现类。PROXY_FACTORY默认走的是JdkProxyFactory,所以PROXY_FACTORY.getProxy(invoker);实际使用的方法就是下面的这段(AbstractProxyFactory):

代码语言:javascript复制
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

在InvokerInvocationHandler类,阐述了代理的具体工作方式。贴出源码:

代码语言:javascript复制

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

这里最重要的是最后一句,调用了Invoker接口(实现类)的invoke方法。根据关系Invoker=>AbstractInvoker=>DubboInvoker,上面的invoker.invoke()使用的是AbstractInvoker的invoke:

代码语言:javascript复制
@Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service "   this   " on consumer "   NetUtils.getLocalHost()   " is destroyed, "
                      ", dubbo version is "   Version.getVersion()   ", this invoker should not be used any longer");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            invocation.addAttachments(contextAttachments);
        }

        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
    }

DubboInvoker的doInvoke方法:

代码语言:javascript复制
 @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                responseFuture.whenComplete((obj, t) -> {
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
        }
    }

在这里,invoke函数最终会转为网络调用。

RpcInvocation的构造函数中,包含了客户端传递给invoker的信息:

代码语言:javascript复制
public RpcInvocation(Invocation invocation, Invoker<?> invoker) {
        this(invocation.getMethodName(), invocation.getParameterTypes(),
                invocation.getArguments(), new HashMap<String, String>(invocation.getAttachments()),
                invocation.getInvoker());
        if (invoker != null) {
            URL url = invoker.getUrl();
            setAttachment(PATH_KEY, url.getPath());
            if (url.hasParameter(INTERFACE_KEY)) {
                setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            }
            if (url.hasParameter(GROUP_KEY)) {
                setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
            }
            if (url.hasParameter(VERSION_KEY)) {
                setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY, "0.0.0"));
            }
            if (url.hasParameter(TIMEOUT_KEY)) {
                setAttachment(TIMEOUT_KEY, url.getParameter(TIMEOUT_KEY));
            }
            if (url.hasParameter(TOKEN_KEY)) {
                setAttachment(TOKEN_KEY, url.getParameter(TOKEN_KEY));
            }
            if (url.hasParameter(APPLICATION_KEY)) {
                setAttachment(APPLICATION_KEY, url.getParameter(APPLICATION_KEY));
            }
        }
    }

至此,我们理清了invoker.invode的调用过程。

0 人点赞