Java必备之从JDK到Dubbo的SPI深度剖析

2019-10-14 19:06:19 浏览数 (1)

1.Java SPI

SPI简单实现

SPI(service provider interface):服务提供接口

定义接口

代码语言:javascript复制
public interface SayHelloService {

    /**
     * 打招呼
     *
     * @param name
     * @return
     */
    String sayHello(String name);
}

定义多个实现

中文实现

代码语言:javascript复制
public class ChineseSayHelloServiceImpl implements SayHelloService {

    @Override
    public String sayHello(String name) {
        return "你好,"   name;
    }
}

英文实现

代码语言:javascript复制
public class EnglishSayHelloServiceImpl implements SayHelloService{

    @Override
    public String sayHello(String name) {
        return "Hi,"   name;
    }
}

创建资源文件

在resources目录下面创建META-INF/services/接口全限定名

本示例路径META-INF/services/com.springboot.dubbo.spi.SayHelloService

然后在文件中写入所有的实现

代码语言:javascript复制
com.springboot.dubbo.spi.ChineseSayHelloServiceImpl
com.springboot.dubbo.spi.EnglishSayHelloServiceImpl

测试

代码语言:javascript复制
@Test
public void spiTest() {
    ServiceLoader<SayHelloService> services = ServiceLoader.load(SayHelloService.class);
    for (SayHelloService sayHelloService : services) {
        System.out.println(sayHelloService.sayHello("java spi"));
    }
}

输出结果

你好,java spi Hi,java spi

SPI源码分析

ServiceLoader#load,获取上下文类加载器

代码语言:javascript复制
public static <S> ServiceLoader<S> load(Class<S> var0) {
    ClassLoader var1 = Thread.currentThread().getContextClassLoader();
    return load(var0, var1);
}

初始化ServiceLoader方法

代码语言:javascript复制
public static <S> ServiceLoader<S> load(Class<S> var0, ClassLoader var1) {
    return new ServiceLoader(var0, var1);
}
代码语言:javascript复制
private ServiceLoader(Class<S> var1, ClassLoader var2) {
    this.service = (Class)Objects.requireNonNull(var1, "Service interface cannot be null");
    this.loader = var2 == null ? ClassLoader.getSystemClassLoader() : var2;
    this.reload();
}

初始化LazyIterator

代码语言:javascript复制
public void reload() {
    this.providers.clear();
    this.lookupIterator = new ServiceLoader.LazyIterator(this.service, this.loader);
}

循环遍历首先调用LazyIterator#hasNext

代码语言:javascript复制
public boolean hasNext() {
    if (this.nextName != null) {
        return true;
    } else {
        if (this.configs == null) {
            try {
                // 找到配置所在的目录
                String var1 = "META-INF/services/"   this.service.getName();
                if (this.loader == null) {
                    this.configs = ClassLoader.getSystemResources(var1);
                } else {
                    // 使用类加载器加载资源文件
                    this.configs = this.loader.getResources(var1);
                }
            } catch (IOException var2) {
                ServiceLoader.fail(this.service, "Error locating configuration files", var2);
            }
        }

        while(this.pending == null || !this.pending.hasNext()) {
            if (!this.configs.hasMoreElements()) {
                return false;
            }
            // 得到所有的实现类的全限定名
            this.pending = ServiceLoader.this.parse(this.service, (URL)this.configs.nextElement());
        }

        // 获取实现类的全限定名
        this.nextName = (String)this.pending.next();
        return true;
    }
}

循环遍历调用LazyIterator#next

代码语言:javascript复制
public S next() {
    if (!this.hasNext()) {
        throw new NoSuchElementException();
    } else {
        // 获取实现类的全限定名
        String var1 = this.nextName;
        this.nextName = null;
        Class var2 = null;

        try {
            // 反射获取实现类全限定名对应的Class对象
            var2 = Class.forName(var1, false, this.loader);
        } catch (ClassNotFoundException var5) {
            ServiceLoader.fail(this.service, "Provider "   var1   " not found");
        }

        if (!this.service.isAssignableFrom(var2)) {
            ServiceLoader.fail(this.service, "Provider "   var1   " not a subtype");
        }

        try {
            // 实例化实现类对象
            Object var3 = this.service.cast(var2.newInstance());
            ServiceLoader.this.providers.put(var1, var3);
            return var3;
        } catch (Throwable var4) {
            ServiceLoader.fail(this.service, "Provider "   var1   " could not be instantiated: "   var4, var4);
            throw new Error();
        }
    }
}

小结:

从上面的源码分析可以得知,首先会解析【/META-INF/services/接口全限定名】 这个路径下面的文件,然后得到里面的实现类的全限定名称,然后通过反射获取Class对象,最终通过调用newInstance()方法实例化实现类对象,最终调用实现类的目标方法

Java SPI的缺点

会一次加载所有的实现类,然后从中选取我们需要的实现。而不是我们配置需要哪个实现从而加载对应的实现

Dubbo的SPI

Dubbo为什么要实现自己的SPI

通过之前对Java SPI的介绍,我们了解到Java本身自带的SPI只能加载所有的实现,不能根据需要来进行选择相对应的实现,使用过Dubbo都知道我们可以自定义一些功能,比如协议选择,可以是rest,可以是dubbo;负载功能,可以使随机负载,轮询负载等。类似这样的无法通过Java SPI来完成,所以只能自己实现

Dubbo实现SPI思路

  • 定义扩展点,也就是接口,必须使用@SPI注解标注
  • 定义扩展点实现类
  • 每个扩展点对应一个扩展加载器
  • 读取配置文件获取扩展点适配实例,并缓存,以此来提供IOC功能
  • 读取配置文件的同时会缓存实现类标识与实现类class对象的对应关系

Dubbo SPI配置文件

类似Java SPI,在resources目录下面创建目录,在目录文件中声明如下

代码语言:javascript复制
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

可以看到增进的功能就是,为每个实现都指定了一个标识,也就是“=”前面的内容,这样可以根据RPC请求中的参数,来匹配最终使用哪种扩展

Dubbo SPI的优势

  • 性能好,使用了缓存,扩展点的加载只会加载一次,以后直接从缓存中获取
  • 便于扩展,如果需要新增一种实现,只需要实现扩展点,然后在配置文件中指定就可以了
  • 提供了IOC功能,每个扩展点对应一个扩展加载器,当其它类中需要注入的时候,可以通过扩展器来获取

Dubbo SPI 源码分析

接下来我们对Dubbo SPI的源码进行分析。

打开ServiceConfig

代码语言:javascript复制
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

此行代码在很多地方都会进行调用,里面使用到了Dubbo的SPI,借用此代码来进行分析

ExtensionLoader#getExtensionLoader,获取扩展点对应的扩展加载器

代码语言:javascript复制
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    // 扩展点必须是一个接口
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type ("   type   ") is not an interface!");
    }
    // 扩展点必须使用@SPI注解
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type ("   type  
                ") is not an extension, because it is NOT annotated with @"   SPI.class.getSimpleName()   "!");
    }
    // 从缓存中获取扩展点对应的扩展点加载器
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        // 如果扩展点对应的扩展加载器不存在,则创建一个新的扩展加载器,然后放入缓存
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

分析创建扩展加载器new ExtensionLoader

代码语言:javascript复制
private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

此处又调用了ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())这样一行代码,我们接着分析,通过上面的分析,我们得知ExtensionLoader#getExtensionLoader就是根据传入的扩展点来获取对应的扩展加载器,如果缓存中有,就直接返回,如果没有,则新建

此处开始分析的扩展点由原本的Protocol变成了ExtensionFactory,也就是当前扩展点加载器对象的type=ExtensionLoader,objectFactory=null

接下来分析getAdaptiveExtension

代码语言:javascript复制
public T getAdaptiveExtension() {
    // 获取缓存的适配实例
    Object instance = cachedAdaptiveInstance.get();
    // 采用双重锁➕volatile的方式来创建适配实例
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: "   t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("Failed to create adaptive instance: "   createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

创建适配扩展createAdaptiveExtension

代码语言:javascript复制
private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension "   type   ", cause: "   e.getMessage(), e);
    }
}

获取适配扩展class对象,getAdaptiveExtensionClass

代码语言:javascript复制
private Class<?> getAdaptiveExtensionClass() {
    // 获取扩展class
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

获取扩展class,getExtensionClasses

代码语言:javascript复制
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                // 加载扩展class
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

从配置文件中加载扩展class

代码语言:javascript复制
private Map<String, Class<?>> loadExtensionClasses() {
    cacheDefaultExtensionName();

    /**
     * 从resources/META-INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionFactory
     * resources/META-INF/dubbo/org.apache.dubbo.common.extension.ExtensionFactory
     * resources/META-INF/services/org.apache.dubbo.common.extension.ExtensionFactory
     * 这3个目录中加载扩展点的实现
     */
    Map<String, Class<?>> extensionClasses = new HashMap<>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

加载目录

代码语言:javascript复制
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
    String fileName = dir   type;
    try {
        Enumeration<java.net.URL> urls;
        // 获取类加载器
        ClassLoader classLoader = findClassLoader();
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL resourceURL = urls.nextElement();
                // 加载资源
                loadResource(extensionClasses, classLoader, resourceURL);
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: "  
                type   ", description file: "   fileName   ").", t);
    }
}

加载资源

代码语言:javascript复制
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            /**
             * 一行一行读取配置文件的内容,配置文件中的内容如下
             * spring=org.apache.dubbo.config.spring.extension.SpringExtensionFactory
             */
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i   1).trim();
                        }
                        if (line.length() > 0) {
                            loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: "   type   ", class line: "   line   ") in "   resourceURL   ", cause: "   t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: "  
                type   ", class file: "   resourceURL   ") in "   resourceURL, t);
    }
}

加载class

代码语言:javascript复制
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: "  
                type   ", class line: "   clazz.getName()   "), class "
                  clazz.getName()   " is not subtype of interface.");
    }
    // 判断class是否含有@Adaptive注解,此类就是我们需要的扩展适配类
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        cacheAdaptiveClass(clazz);
    }
    // 是否为包裹类,这部分内容会在后续的服务发布的时候讲解到,暂不分析
    else if (isWrapperClass(clazz)) {
        cacheWrapperClass(clazz);
    } else {
        clazz.getConstructor();
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class "   clazz.getName()   " in the config "   resourceURL);
            }
        }

        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            // 如果当前class上有@Activate注解,在存入缓存
            cacheActivateClass(clazz, names[0]);
            for (String n : names) {
                // 将class和名称存入cachedNames,即key为SpringExtensionFactory class对象,value为spring
                cacheName(clazz, n);
                // 将名称和class存入cachedClasses,即key为spring,value为SpringExtensionFactory class对象
                saveInExtensionClass(extensionClasses, clazz, name);
            }
        }
    }
}

到此加载配置文件功能到此结束,主要做了如下几件事情

  • 从指定目录下,当前class全路径文件中读取扩展点实现类信息
  • 如果实现类中含有@Adaptive注解的类,将次类设置为cachedAdaptiveClass
  • 给cachedNames赋值,里面存储的是实现类的class对象和实现类标识名称
  • 给cachedClasses赋值,里面存储的是实现类标识名称和实现类class对象

接下来回到createAdaptiveExtension

代码语言:javascript复制
private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension "   type   ", cause: "   e.getMessage(), e);
    }
}

此处拿ExtensionFactory这个扩展点来举例,通过此前的分析会加载配置文件中的实现类,该扩展点有一个实现类

代码语言:javascript复制
adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory

该类上有一个注解,所以ExtensionFactory的适配类就是AdaptiveExtensionFactory

代码语言:javascript复制
getAdaptiveExtensionClass().newInstance()

那么这行代码返回的就是AdaptiveExtensionFactory对象,然后进入injectExtension

代码语言:javascript复制
private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                if (isSetter(method)) {
                    /**
                     * Check {@link DisableInject} to see if we need auto injection for this property
                     */
                    if (method.getAnnotation(DisableInject.class) != null) {
                        continue;
                    }
                    Class<?> pt = method.getParameterTypes()[0];
                    if (ReflectUtils.isPrimitives(pt)) {
                        continue;
                    }
                    try {
                        String property = getSetterProperty(method);
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("Failed to inject via method "   method.getName()
                                  " of interface "   type.getName()   ": "   e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

由于objectFactory为null,此处直接返回AdaptiveExtensionFactory对象

回到最初的构造

代码语言:javascript复制
private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

此处可以得知Protocol扩展点对应扩展器的objectFactory为AdaptiveExtensionFactory对象,那么接下来就该分析第一行代码的第二部分了,也就是getAdaptiveExtension部分

代码语言:javascript复制
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

再来看看getAdaptiveExtension实现

代码语言:javascript复制
private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

通过如上的分析,扩展点的实现类中存在被@Adaptive标注的类,那么cachedAdaptiveClass才不为null,通过查看Protocol的所有实现类,不存在适配类,所以只能执行createAdaptiveExtensionClass来创建适配class对象

代码语言:javascript复制
private Class<?> createAdaptiveExtensionClass() {
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

如上代码又看到了熟悉的部分,通过之前的分析,很容易知道ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension()这样代码返回的是AdaptiveCompiler对象

AdaptiveCompiler#compile

代码语言:javascript复制
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
    Compiler compiler;
    // 获取对应的加载器
    ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
    String name = DEFAULT_COMPILER; // copy reference
    if (name != null && name.length() > 0) {
        compiler = loader.getExtension(name);
    } else {
        // 加载默认的实现,也就是Compile扩展点@SPI注解上给定的值@SPI("javassist")
        compiler = loader.getDefaultExtension();
    }
    return compiler.compile(code, classLoader);
}

最终调用了JavassistCompiler#doCompile,编译结果如下

代码语言:javascript复制
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy()  {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort()  {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("   url.toString()   ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("   url.toString()   ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

0 人点赞