Dubbo SPI和JAVA SPI 的使用和对比
在Dubbo中,SPI是一个非常核心的机制,贯穿在几乎所有的流程中。搞懂这块内容,是接下来了解Dubbo更多源码的关键因素。
Dubbo是基于Java原生SPI机制思想的一个改进,所以,先从JAVA SPI机制开始了解什么是SPI以后再去学习Dubbo的SPI,就比较容易了
关于JAVA 的SPI机制
SPI全称(service provider interface),是JDK内置的一种服务提供发现机制,目前市面上有很多框架都是用它来做服务的扩展发现,大家耳熟能详的如JDBC、日志框架都有用到;
简单来说,它是一种动态替换发现的机制。举个简单的例子,如果我们定义了一个规范,需要第三方厂商去实现,那么对于我们应用方来说,只需要集成对应厂商的插件,既可以完成对应规范的实现机制。 形成一种插拔式的扩展手段。
实现一个SPI机制
实现的代码的流程图如下
SPI规范总结
实现SPI,就需要按照SPI本身定义的规范来进行配置,SPI规范如下
- 需要在classpath下创建一个目录,该目录命名必须是:META-INF/services
- 在该目录下创建一个properties文件,该文件需要满足以下几个条件
a) 文件名必须是扩展的接口的全路径名称
b) 文件内部描述的是该扩展接口的所有实现类
c) 文件的编码格式是UTF-8
- 通过java.util.ServiceLoader的加载机制来发现
SPI的实际应用
SPI在很多地方有应用,大家可以看看最常用的java.sql.Driver驱动。JDK官方提供了java.sql.Driver这个驱动扩展点,但是你们并没有看到JDK中有对应的Driver实现。 那在哪里实现呢?
以连接Mysql为例,我们需要添加mysql-connector-java依赖。然后,你们可以在这个jar包中找到SPI的配置信息。如下图,所以java.sql.Driver由各个数据库厂商自行实现。这就是SPI的实际应用。当然除了这个意外,大家在spring的包中也可以看到相应的痕迹
SPI的缺点
- JDK标准的SPI会一次性加载实例化扩展点的所有实现,什么意思呢?就是如果你在META-INF/service下的文件里面加了N个实现类,那么JDK启动的时候都会一次性全部加载。那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
- 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因
Dubbo优化后的SPI实现
基于Dubbo提供的SPI规范实现自己的扩展
在了解Dubbo的SPI机制之前,先通过一段代码初步了解Dubbo的实现方式,这样,我们就能够形成一个对比,得到这两种实现方式的差异
Dubbo的SPI机制规范
大部分的思想都是和SPI是一样,只是下面两个地方有差异。
- 需要在resource目录下配置META-INF/dubbo或者META-INF/dubbo/internal或者META-INF/services,并基于SPI接口去创建一个文件
- 文件名称和接口名称保持一致,文件内容和SPI有差异,内容是KEY对应Value
Dubbo SPI机制源码阅读
ps: 源码阅读,带着疑问去了解【为什么传入一个myProtocol就能获得自定义的DefineProtocol对象】、 getAdaptiveExtension是一个什么东西?
- Protocol protocol = ExtensionLoader. getExtensionLoader(Protocol.class). getExtension(“myProtocol”);
- Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();
源码阅读入口
接下来的源码分析,是基于下面这段代码作为入口,至于为什么不用上面提到的第一段代码作为入口。
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();
把上面这段代码分成两段,一段是getExtensionLoader、 另一段是getAdaptiveExtension。 初步猜想一下;
第一段是通过一个Class参数去获得一个ExtensionLoader对象,有点类似一个工厂模式。
第二段getAdaptiveExtension,去获得一个自适应的扩展点
Extension源码的结构
了解源码结构,建立一个全局认识。结构图如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MrfZG2S8-1594568800692)(C:/Users/Administrator/AppData/Local/Temp/msohtmlclip1/01/clip_image002.jpg)]
初步了解这些代码在扩展点中的痕迹。
Protocol源码
一下是Protocol的源码,在这个源码中可以看到有两个注解,一个是在类级别上的@SPI(“dubbo”). 另一个是@Adaptive
@SPI 表示当前这个接口是一个扩展点,可以实现自己的扩展实现,默认的扩展点是DubboProtocol。
@Adaptive 表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类
com.alibaba.dubbo.rpc.Protocol
代码语言:javascript复制/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*
* @author william.liangf
*/
@SPI("dubbo")
public interface Protocol {
/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}
getExtensionLoader
该方法需要一个Class类型的参数,该参数表示希望加载的扩展点类型,该参数必须是接口,且该接口必须被@SPI注解注释,否则拒绝处理。检查通过之后首先会检查ExtensionLoader缓存中是否已经存在该扩展对应的ExtensionLoader,如果有则直接返回,否则创建一个新的ExtensionLoader负责加载该扩展实现,同时将其缓存起来。可以看到对于每一个扩展,dubbo中只会有一个对应的ExtensionLoader实例
代码语言:javascript复制@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if(!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" type ") is not interface!");
}
if(!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" type
") is not extension, because WITHOUT @" SPI.class.getSimpleName() " Annotation!");
}
//从静态缓存获取实例
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
getAdaptiveExtension
通过getExtensionLoader获得了对应的ExtensionLoader实例以后,再调用getAdaptiveExtension()方法来获得一个自适应扩展点。接下来我们来看看代码的实现
ps:简单对自适应扩展点做一个解释,大家一定了解过适配器设计模式,而这个自适应扩展点实际上就是一个适配器。
这个方法里面主要做几个事情:
- 从cacheAdaptiveInstance 这个内存缓存中获得一个对象实例
- 如果实例为空,说明是第一次加载,则通过双重检查锁的方式去创建一个适配器扩展点
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
//双重检查锁
if (instance == null) {
if(createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
//放入缓存中
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " t.toString(), t);
}
}
}
}
else {
throw new IllegalStateException("fail to create adaptive instance: " createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
createAdaptiveExtension
这段代码里面有两个结构,一个是injectExtension. 另一个是getAdaptiveExtensionClass()
我们需要先去了解getAdaptiveExtensionClass这个方法做了什么?很显然,从后面的.newInstance来看,应该是获得一个类并且进行实例)
代码语言:javascript复制@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " type ", cause: " e.getMessage(), e);
}
}
getAdaptiveExtensionClass
从类名来看,是获得一个适配器扩展点的类。
在这段代码中,做了两个事情
- getExtensionClasses() 加载所有路径下的扩展点
- createAdaptiveExtensionClass() 动态创建一个扩展点
cachedAdaptiveClass这里有个判断,用来判断当前Protocol这个扩展点是否存在一个自定义的适配器,如果有,则直接返回自定义适配器,否则,就动态创建,这个值是在getExtensionClasses中赋值的,这块代码我们稍后再看
代码语言:javascript复制 private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
createAdaptiveExtensionClass
动态生成适配器代码,以及动态编译
- createAdaptiveExtensionClassCode, 动态创建一个字节码文件。返回code这个字符串
- 通过compiler.compile进行编译(默认情况下使用的是javassist)
- 通过ClassLoader加载到jvm中
//创建一个适配器扩展点(创建一个动态的字节码文件)
private Class<?> createAdaptiveExtensionClass() {
//生成字节码代码
String code = createAdaptiveExtensionClassCode();
//获得类加载器
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
//动态编译字节码
return compiler.compile(code, classLoader);
}
code的字节码内容
code 内容是一个Protocol$Adaptive类 的主要功能
- 从url或扩展接口获取扩展接口实现类的名称;
- 根据名称,获取实现类ExtensionLoader.getExtensionLoader(扩展接口类).getExtension(扩展接口实现类名称),然后调用实现类的方法。
需要明白一点dubbo的内部传参基本上都是基于Url来实现的,也就是说Dubbo是基于URL驱动的技术
所以,适配器类的目的是在运行期获取扩展的真正实现来调用,解耦接口和实现,这样的话要不我们自己实现适配器类,要不dubbo帮我们生成,而这些都是通过Adpative来实现。
到目前为止,我们的AdaptiveExtension的主线走完了,可以简单整理一下他们的调用关系如下
我们再回过去梳理下代码,实际上在调用createAdaptiveExtensionClass之前,还做了一个操作。是执行getExtensionClasses方法,我们来看看这个方法做了什么事情
getExtensionClasses
getExtensionClasses这个方法,就是加载扩展点实现类了。这段代码本来应该先看的,但是担心先看这段代码会容易导致大家不好理解。我就把顺序交换了下
这段代码主要做如下几个事情
- 从cachedClasses中获得一个结果,这个结果实际上就是所有的扩展点类,key对应name,value对应class
- 通过双重检查锁进行判断
- 调用loadExtensionClasses,去加载所有扩展点的实现
//加载扩展点的实现类
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
//双重检查锁
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
loadExtensionClasses
从不同目录去加载扩展点的实现,在最开始的时候讲到过的。META-INF/dubbo ;META-INF/internal ; META-INF/services
主要逻辑
- 获得当前扩展点的注解,也就是Protocol.class这个类的注解,@SPI
- 判断这个注解不为空,则再次获得@SPI中的value值
- 如果value有值,也就是@SPI(“dubbo”),则讲这个dubbo的值赋给cachedDefaultName。这就是为什么我们能够通过ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension() ,能够获得DubboProtocol这个扩展点的原因
- 最后,通过loadFile去加载指定路径下的所有扩展点。也就是META-INF/dubbo;META-INF/internal;META-INF/services
// 此方法已经getExtensionClasses方法同步过。
private Map<String, Class<?>> loadExtensionClasses() {
//得到SPI的注解
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
//如果不为空
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " type.getName()
": " Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
loadFile
解析指定路径下的文件,获取对应的扩展点,通过反射的方式进行实例化以后,put到extensionClasses这个Map集合中
代码语言:javascript复制private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line = null;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i 1).trim();
}
if (line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader);
//加载对应的实现类 并且判断实现类必须是当前的加载的扩展点的实现
if (! type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: "
type ", class line: " clazz.getName() "), class "
clazz.getName() "is not subtype of interface.");
}
//判断是否有自定义适配类 如果有 则在前面获取适配器类的时候 返回当前的自定义适配类 不需要再动态创建
if (clazz.isAnnotationPresent(Adaptive.class)) {
if(cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (! cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
cachedAdaptiveClass.getClass().getName()
", " clazz.getClass().getName());
}
} else {
try {
//如果没有Adaptive注解 则判断当前类是否带有参数是type类型的构造函数 如果有 则认为是
//wrapper类 这个wrapper实际上就是对扩展类进行装饰
//可以在dubbo-rpc-api/internal下找到protocol文件 发现protocol配置了三个装饰
//分别是filter listener mock 所以protocol这个实例
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
//包装类
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " clazz.getName() " in the config " url);
}
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " type.getName() " name " n " on " c.getName() " and " clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " type ", class line: " line ") in " url ", cause: " t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: "
type ", class file: " url ") in " url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: "
type ", description file: " fileName ").", t);
}
}
阶段性小结
截止到目前,我们已经把基于Protocol的自适应扩展点看完了。也明白最终这句话应该返回的对象是什么了.
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();
也就是,这段代码中,最终的protocol应该等于= Protocol$Adaptive
代码语言:javascript复制import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" url.toString() ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" url.toString() ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
injectExtension
还记得这段代码吗?上面分析的代码,只是知道,最终getAdaptiveExtensionClass.new Instance获得一个自适应扩展点。而还有一段代码
injectExtension没有讲。简单来说,这个方法的作用,是为这个自适应扩展点进行依赖注入。类似于spring里面的依赖注入功能。
为适配器类的setter方法插入其他扩展点或实现。
代码语言:javascript复制private T createAdaptiveExtension() {
try {
//可以实现扩展点的注入
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " type ", cause: " e.getMessage(), e);
}
}
这里可以看到,扩展点自动注入的一句就是根据 setter 方法对应的参数类型和 property 名称从 ExtensionFactory 中查询,如果有返回扩展点实例,那么就进行注入操作。到这里 getAdaptiveExtension 方法就分析完毕了。
@Adaptive( 如果这个注解在方法层面上,会动态生成一个自适应的适配器 如果是在类级别上,表示直接加载自定义的自适应适配器) Extension.getExtensionLoader().getExtension(“”); //加载一个指定名称的扩展点