目录
- 加载流程归纳
- Plugin类图
- 加载流程源码分析
- 函数式接口PluginsProvider
- Lambda表达式展开
- 循环处理plugin
- 遍历/plugin所有子目录进行处理
- 构造PluginClassLoader
- 单个plugin处理
- 注册PluginClassLoader到HandleResolver
- 设置当前线程ClassLoader
- 获取Plugin实现类
- Duplicate PluginClassLoader
- 构造InternalConnectorFactory
- 小结
最近在研究Trino的相关代码,发现用到了大量的函数式编程和lambda表达式等java8的新特性。刚开始接触门槛比较高,代码阅读也比较费劲,因此希望借这个系列,对自己的代码学习做一些记录,能够帮助到一些刚入门的同学。本文将会跟着代码,一步一步分析Trino的plugin加载到底做了哪些事情,当前代码分析都是基于trino的maste分支(最新的commit记录为:6a48c4352dfc6835997c43d7d5f7a599c0a712a5)。
加载流程归纳
由于后面涉及到了比较多的代码分析,防止阅读起来比较混乱,这里先用一个流程图对整个plugin的加载做一个整体的归纳总结,能够有一个大致的印象,后面在阅读代码的时候也可以对照着来看。需要注意的是,由于代码里面很多都是通过lambda表达式作为参数,进行循环嵌套处理,所以整个流程与后续的章节略有不同。
Plugin类图
在正式分析代码流程之前,先简单看下Plugin相关的类图,Trino支持的每一个plugin,在代码里面都会有一个对应的Plugin的实现类,如下所示:
可以看到,主要分成JdbcPlugin和其他的Plugin,例如KuduPlugin、BigQueryPlugin等。而JdbcPlugin又包含了MysqlPlugin、OraclePlugin、ClickHousePlugin等。通过Plugin的方法,可以获取对应的ConnectorFactory。ConnectorFactory也分为两种,主要是JdbcConnectorFactory和其他的ConnectorFactory实现类。需要注意的是,例如Mysql、Oracle等这些jdbc类型的plugin,都是直接使用了JdbcConnectorFactory。也就是说,通过MysqlPlugin、ClickhousePlugin等获取到的都是JdbcConnectorFactory,相关代码如下所示:
代码语言:javascript复制//JdbcPlugin.java,MysqlPlugin、OraclePlugin等都是直接继承的这个方法,没有单独实现
public Iterable<ConnectorFactory> getConnectorFactories() {
return ImmutableList.of(new JdbcConnectorFactory(name,
combine(new CredentialProviderModule(),
new ExtraCredentialsBasedIdentityCacheMappingModule(),module)));
}
关于Plugin和ConnectorFactory,我们会在下面的代码分析中反复提到。
加载流程源码分析
Trino服务在启动的时候,会从指定的路径来加载所有的plugin,默认是/plugin目录下。在该目录下,每个plugin都是以单独的子目录形式存在,该plugin所需要的jar,都放在这个子目录中。以Clickhouse为例,如下所示:
Trino会遍历/plugin目录下的所有子目录,然后依次加载,相关的函数调用如下所示:
代码语言:javascript复制doStart(Server.java):126
-loadPlugins(PluginManager.java):135
--loadPlugin(PluginManager.java):155
---loadPlugin(PluginManager.java):169
----installPlugin(PluginManager.java):175
-----installPluginInternal(PluginManager.java):198
在Trino中,大量用到了函数式编程和lambda表达式。在循环遍历/plugin目录来加载所有plugin的时候,主要是通过如下代码来操作的:
代码语言:javascript复制//PluginManager.loadPlugins():135
pluginsProvider.loadPlugins(this::loadPlugin, PluginManager::createClassLoader);
函数式接口PluginsProvider
其中,pluginsProvider是一个interface,如下所示:
代码语言:javascript复制//PluginManager.java
public interface PluginsProvider {
void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader);
interface Loader {
void load(String description, Supplier<PluginClassLoader> getClassLoader);
}
interface ClassLoaderFactory {
PluginClassLoader create(String pluginName, List<URL> urls);
}
}
可以看到PluginsProvider内部有一个loadPlugins,属于abstract method,因此是一个函数式接口。同理,嵌套的Loader和ClassLoaderFactory也属于函数式接口。
Lambda表达式展开
上面loadPlugins的那一行代码,其实是进行了lambda表达的简写,我们展开来看,对于初学者更加友好:
代码语言:javascript复制//演示代码,实际源码中不存在
PluginsProvider.Loader loader =
(plugin, createClassLoader) -> loadPlugin(plugin, createClassLoader);
PluginsProvider.ClassLoaderFactory factory =
(pluginName, urls) -> createClassLoader(pluginName, urls);
pluginsProvider.loadPlugins(loader, factory);
实际源码中就是将两个lambda表达式,简写为了“类名::实例方法名”,然后作为参数,传给loadPlugins方法。
循环处理plugin
PluginsProvider本身是一个接口,它有一个实现类,相应的loadPlugins方法实现,如下所示:
代码语言:javascript复制//ServerPluginsProvider.java
public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader) {
executeUntilFailure(
executor,
listFiles(installedPluginsDir).stream()
.filter(File::isDirectory)
.map(file -> (Callable<?>) () -> {
loader.load(file.getAbsolutePath(), () ->
createClassLoader.create(file.getName(), buildClassPath(file)));
return null;
})
.collect(toImmutableList()));
}
可以看到使用了很长的lambda表达式,这里我们也简单拆解来看。
遍历/plugin所有子目录进行处理
首先会遍历/plugin下所有的子目录,然后转换成File,并使用isDirectory进行过滤,对应如下所示的代码:
代码语言:javascript复制listFiles(installedPluginsDir).stream().filter(File::isDirectory)
private static List<File> listFiles(File path) {
//省略无关代码
return stream(directoryStream).map(Path::toFile).sorted().collect(toImmutableList());
}
然后再对获取到的所有子目录进行逐个处理。整个处理过程又可以分为两个部分,我们来分别看一下。
构造PluginClassLoader
首先看最里面的lambda表达式:
代码语言:javascript复制() -> createClassLoader.create(file.getName(), buildClassPath(file))
private static List<URL> buildClassPath(File path) {
return listFiles(path).stream().map(ServerPluginsProvider::fileToUrl).collect(toImmutableList());
}
private static URL fileToUrl(File file) {
//省略无关代码
return file.toURI().toURL();
}
首先要明白,输入参数file代表的是一个子目录,对应着一个plugin,这个子目录下会包含这个plugin所依赖的jar。这里就是将这些jar转换成一个URL类型的集合,作为输入参数。然后跟plugin的名称一起作为参数,传给ClassLoaderFactory的create方法,实际绑定的是PluginManager.createClassLoader方法,我们通过上面的lambda展开代码可以看出,最终会返回一个PluginClassLoader,如下所示:
代码语言:javascript复制public static PluginClassLoader createClassLoader(String pluginName, List<URL> urls) {
ClassLoader parent = PluginManager.class.getClassLoader();
return new PluginClassLoader(pluginName, urls, parent, SPI_PACKAGES);
}
这样,对于每一个trino支持的plugin,都会创建一个对应的PluginClassLoader,并且这个PluginClassLoader会包含该plugin依赖的所有jar。后面就是真正进行plugin的一系列加载操作。我们继续来看一下。
单个plugin处理
构造完成PluginClassLoader之后,将该plugin的absolute path和PluginClassLoader本身作为参数传给Loader的load方法:
代码语言:javascript复制//这里为了展示,将原先的代码进行了展开,不是原代码的实际内容
//由于这里是lambda表达式,所以不能直接用PluginClassLoader作为返回值,需要使用Supplier接口将其包装起来
Supplier<PluginClassLoader> supplier =
() -> createClassLoader.create(file.getName(),buildClassPath(file));
xxx.map(file -> (Callable<?>) () -> {
loader.load(file.getAbsolutePath(), supplier);
return null;
})
这里实际绑定的是PluginManager.loadPlugin方法,我们同样可以通过从最上面的展开代码看出。函数主体如下所示:
代码语言:javascript复制private void loadPlugin(String plugin, Supplier<PluginClassLoader> createClassLoader) {
//省略无关代码
PluginClassLoader pluginClassLoader = createClassLoader.get();
handleResolver.registerClassLoader(pluginClassLoader);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
}
}
这个loadPlugin方法主要可以分为三个部分,我们逐个来看下。
注册PluginClassLoader到HandleResolver
首先是将当前的PluginClassLoader注册到HandleResolver中,这个类型也很简单,就是保存所有创建了的ClassLoader,如下所示:
代码语言:javascript复制//HandleResolver.java
private final Map<String, ClassLoader> classLoaders = new ConcurrentHashMap<>();
public void registerClassLoader(PluginClassLoader classLoader) {
ClassLoader existingClassLoader = classLoaders.putIfAbsent(classLoader.getId(), classLoader);
checkState(xxx);
}
//PluginClassLoader.java
public String getId() {
return pluginName catalogName.map(name -> ":" name).orElse("");
}
将PluginClassLoader本身和id加到了map中,id使用了pluginName和catalogName进行拼接。pluginName就是每个plugin对应目录的名称,catalogName我们在后续的文章详细介绍,这里暂不展开。
设置当前线程ClassLoader
接着,会将当前的线程的ClassLoader设置为该PluginClassLoader,如下所示:
代码语言:javascript复制//ThreadContextClassLoader.java
private final ClassLoader originalThreadContextClassLoader;
public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {
this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(newThreadContextClassLoader);
}
这样是为了保证每个plugin依赖的类,都会用各自的PluginClassLoader去进行加载。这样就可以保证不同plugin之间,依赖的jar不会产生冲突。 然后继续调用重载的loadPlugin方法,这个方法目前只接收一个PluginClassLoader作为参数进行后续的加载操作。
获取Plugin实现类
我们接着往下看重载的loadPlugin的函数主体:
代码语言:javascript复制//PluginManager.java,省略了部分无关代码
private void loadPlugin(PluginClassLoader pluginClassLoader) {
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
for (Plugin plugin : plugins) {
installPlugin(plugin, pluginClassLoader::duplicate);
}
}
首先,通过ServiceLoader来获取当前PluginClassLoader中所有Plugin接口的实现类。以Hive为例,就是寻找plugin/hive下所有jar中,Plugin接口的实现类,这里对应的只有一个HivePlugin。所以,虽然这里使用了循环调用installPlugin,但通常每个plugin路径下,都是只有一个实现类。我们可以通过服务端日志进行验证:
代码语言:javascript复制2022-02-18T16:51:25.936 0800 INFO main io.trino.server.PluginManager -- Loading plugin /data/impala/presto/data/plugin/hive --
2022-02-18T16:51:25.964 0800 INFO main io.trino.server.PluginManager Installing io.trino.plugin.hive.HivePlugin
然后调用installPlugin来加载每一个Plugin的实现类。这里同样用到了lambda表达式,将pluginClassLoader::duplicate作为参数传入。
Duplicate PluginClassLoader
我们展开上述的duplicate表达式来看:
代码语言:javascript复制//这里也是演示代码,实际原代码中不存在
Function<CatalogName, ClassLoader> function = (CatalogName name) -> pluginClassLoader.duplicate(name);
installPlugin(plugin, function);
这里duplicate函数就是为了复制一个PluginClassLoader,但是与原先PluginClassLoader不同的时候,这里会传入新的catalogName和urls,如下所示:
代码语言:javascript复制//除了catalogName和urls,其他都是复用了当前的这个PluginClassLoader的成员
public PluginClassLoader duplicate(CatalogName catalogName) {
return new PluginClassLoader(
pluginName,
Optional.of(requireNonNull(catalogName, "catalogName is null")),
ImmutableList.copyOf(getURLs()),
spiClassLoader,
spiPackages,
spiResources);
}
关于这个duplicate函数,我们会在后续catalog的加载时用到,这里暂不展开说明。我们只需要知道,这里将duplicate函数作为Function的主体传给了installPlugin函数即可。
构造InternalConnectorFactory
接着将Plugin和Function作为参数,传入installPluginInternal函数。在installPluginInternal函数中,会进行各种注册,包括connector、functions、resource group等。这里主要关注下connector的注册,这个跟后续的catalog加载有关系,相关代码如下所示: 我们继续看后续的调用栈,如下所示:
代码语言:javascript复制//PluginManager.java
private void installPluginInternal(Plugin plugin, Function<CatalogName, ClassLoader>
duplicatePluginClassLoaderFactory) {
for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {
connectorManager.addConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory);
}
}
//ConnectorManager.java
ConcurrentMap<String, InternalConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName,
ClassLoader> duplicatePluginClassLoaderFactory) {
InternalConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(
connectorFactory.getName(),
new InternalConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory));
}
上述代码进行了精简,主要就是通过addConnectorFactory方法,将plugin对应的ConnectorFactory和Function构造为一个InternalConnectorFactory,然后添加到到map中。ConnectorFactory的name是在构造具体的实现类时传入的,一般就是plugin的名称,例如hive、clickhouse等。关于这个InternalConnectorFactory,同样会在后面catalog的加载时用到,这里只是进行了一个注册操作。
小结
到这里,关于plugin的加载基本就已经完成了,主要的操作都是在PluginManager这个类中完成的。通过上述的源码解析可以看到,trino中应用了很多的函数式接口和lambda表达式简写,这种写法可以让代码在很大程度上精简,原来好几行代码才能实现的功能,现在通过一行lambda表达式就可以达到目的,非常方便。笔者之前对于函数式编程和lambda表达式用的也很少,希望以后也可以在工程实践中多多用到。