系列文章
dubbo 源码 v2.7 分析:结构、container 入口及线程模型
dubbo 源码 v2.7 分析:SPI 机制
dubbo 源码 v2.7 分析:核心机制(一)
关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~
一 概述
上一篇重点讲了dubbo中的几种设计模式,和对应的源码。本篇会继续介绍Bean加载、Extension、代理几种机制在dubbo中的应用。
二 Bean加载
dubbo中,bean的加载是基于Spring的可扩展Schema机制。
2.1 Spring的可扩展Schema
在大多数场景,如果我们需要为系统提供可配置化支持,简单的做法是直接基于 Spring的标准 Bean 来配置;但如果配置较为复杂或者需要更多丰富控制,这种简单粗暴的方法会显得非常笨拙。一般的做法会用原生态的方式去解析定义好的 xml 文件,然后转化为配置对象,这种方式当然可以解决所有问题,但实现起来比较繁琐,特别是是在配置非常复杂的时候,解析工作是一个不得不考虑的负担。Spring 提供了可扩展 Schema,配置步骤如下:
2.1.1 设计配置属性和JavaBean
2.1.2 编写XSD文件
Spring用xsd文件校验xml文件格式。校验方法:Spring默认在启动时是要加载XSD文件来验证xml文件的,所以如果有的时候断网了,或者一些开源软件切换域名,那么就很容易碰到应用启动不了。为了防止这种情况,Spring提供了一种机制,默认从本地加载XSD文件。
2.1.3 编写NamespaceHandler和BeanDefinitionParser
完成解析工作,会用到NamespaceHandler 和 BeanDefinitionParser。NamespaceHandler 会根据 schema 和节点名找到某个 BeanDefinitionParser,然后由BeanDefinitionParser 完成具体的解析工作。因此需要分别完成 NamespaceHandler 和BeanDefinitionParser 的实现类,Spring 提供了默认实现类 NamespaceHandlerSupport 和AbstractSingleBeanDefinitionParser,简单的方式就是去继承这两个类。
2.1.4 编写spring.handlers和spring.schemas
放在META-INF文件夹,串联所有部件,让应用感知到。这两个文件需要开发者编写并放入 META-INF 文件夹中,这两个文件的地址必须是 META-INF/spring.handlers 和 META-INF/spring.schemas,spring 会默认去载入这两个文件。
2.1.5 在Bean文件中应用
与配置一个普通的Spring Bean类似,只是需要注意使用的是自定义的Schema。配置好后,再通过Spring容器的getBean(beanName)方法来使用自定义的bean。
2.2 dubbo中的实现方式
在dubbo的META-INF下,可以看到包含了很多文件,其中spring.handlers和spring.schemas就是上面提到的两个串联配置文件:
spring.handles:
代码语言:javascript复制http://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
spring.schemas:
代码语言:javascript复制http://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd
三 Extension机制
即扩展点机制。
3.1 扩展点配置
3.1.1 根据关键字读取配置文件,获得具体的实现类
dubbo-demo/dubbo-demo-xml是dubbo源码的xml示例,其中dubbo-provider.xml的配置:
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider" metadata-type="remote">
<dubbo:parameter key="mapping-type" value="metadata"/>
</dubbo:application>
<dubbo:config-center address="zookeeper://127.0.0.1:2181"/>
<dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
<dubbo:registry id="registry1" address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="-1"/>
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<bean id="greetingService" class="org.apache.dubbo.demo.provider.GreetingServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/>
<dubbo:service version="1.0.0" group="greeting" timeout="5000" interface="org.apache.dubbo.demo.GreetingService"
ref="greetingService"/>
</beans>
其中,<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/> 这段配置就会根据registry去获取指定的Service。
3.1.2 注解@SPI和@Adaptive
@SPI注解:
在Protocol接口中,定义默认协议为dubbo,就是通过@SPI("dubbo")来实现的:
代码语言:javascript复制@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
@Adaptive注解:
这个注解打在接口方法上,通过ExtensionLoader.getAdaptiveExtension()方法获取设配类,会先通过前面的过程生成Java的源代码,再通过编译器编译成class加载。但Compiler的实现策略选择,也是通过ExtensionLoader.getAdaptiveExtension(),如果也通过编译器编译成class会导致死循环?
再分析一下ExtensionLoader.getAdaptiveExtension(),对于实现类上标记了注解@Adaptive的dubbo spi扩展机制,它获取设配类不是生成设配类的Java源代码,而是在读取扩展文件的时候,遇到实现类打了@Adaptive就把这个类作为设配类缓存在ExtensionLoader中,调用时直接返回。
3.1.3 filter和listener
在生成具体的实现类对象时,不是直接读取类文件,而是在读取类文件的基础上,通过filter和listener去封装类对象。
四 代理
4.1 代理生成方式
大家熟知的应该有JDK、cglib这两种方式,在Spring框架中做了这两种动态代理的支持。不过除此之外,Java中海油Javassist库动态代理、Javassist库动态字节码代理。代理之间的区别可参见Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)。
4.2 dubbo中的代理
在dubbo的rpc->proxy包下,我们可以看到javassist、jdk、wrapper这三个包。
其中几个重点类,JdkProxyFactory,JavassistProxyFactory这两个代理工厂类是继承AbstractProxyFactory抽象类,而AbstractProxyFactory这个抽象类又实现了ProxyFactory接口,StubProxyFactoryWrapper则是对ProxyFactory的直接实现。类之间关系如下:
4.3 Invoke调用
先上一张图:
ReferenceConfig类比较重要的是init()方法,在这里做了大量工作。我们关注的是ref = createProxy(map);再继续向下跟进,这个方法会创建一个service proxy并返回:
代码语言:javascript复制return (T) PROXY_FACTORY.getProxy(invoker);
这里的PROXY_FACTORY也是通过扩展点机制获取的:
代码语言:javascript复制private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
关于ProxyFactory的配置,会涉及到META-INF/dubbo.internal/org.apache.dubbo.rpc.ProxyFactory这个文件,内容为:
代码语言:javascript复制stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
这里指定了我们前面提到的三种ProxyFactory实现类。PROXY_FACTORY默认走的是JdkProxyFactory,所以PROXY_FACTORY.getProxy(invoker);实际使用的方法就是下面的这段(AbstractProxyFactory):
代码语言:javascript复制public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
在InvokerInvocationHandler类,阐述了代理的具体工作方式。贴出源码:
代码语言:javascript复制
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
这里最重要的是最后一句,调用了Invoker接口(实现类)的invoke方法。根据关系Invoker=>AbstractInvoker=>DubboInvoker,上面的invoker.invoke()使用的是AbstractInvoker的invoke:
代码语言:javascript复制@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (destroyed.get()) {
logger.warn("Invoker for service " this " on consumer " NetUtils.getLocalHost() " is destroyed, "
", dubbo version is " Version.getVersion() ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
invocation.addAttachments(contextAttachments);
}
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
}
DubboInvoker的doInvoke方法:
代码语言:javascript复制 @Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
} else {
asyncRpcResult.complete((AppResponse) obj);
}
});
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
}
}
在这里,invoke函数最终会转为网络调用。
RpcInvocation的构造函数中,包含了客户端传递给invoker的信息:
代码语言:javascript复制public RpcInvocation(Invocation invocation, Invoker<?> invoker) {
this(invocation.getMethodName(), invocation.getParameterTypes(),
invocation.getArguments(), new HashMap<String, String>(invocation.getAttachments()),
invocation.getInvoker());
if (invoker != null) {
URL url = invoker.getUrl();
setAttachment(PATH_KEY, url.getPath());
if (url.hasParameter(INTERFACE_KEY)) {
setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
}
if (url.hasParameter(GROUP_KEY)) {
setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
}
if (url.hasParameter(VERSION_KEY)) {
setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY, "0.0.0"));
}
if (url.hasParameter(TIMEOUT_KEY)) {
setAttachment(TIMEOUT_KEY, url.getParameter(TIMEOUT_KEY));
}
if (url.hasParameter(TOKEN_KEY)) {
setAttachment(TOKEN_KEY, url.getParameter(TOKEN_KEY));
}
if (url.hasParameter(APPLICATION_KEY)) {
setAttachment(APPLICATION_KEY, url.getParameter(APPLICATION_KEY));
}
}
}
至此,我们理清了invoker.invode的调用过程。