Impala与内嵌Jvm之间的交互

2022-05-20 08:48:30 浏览数 (1)

了解过Impala的同学都知道,Impala的节点分为BE和FE两个模块,分别是由C 和Java编写的。对于impalad而言,FE端主要是进行SQL的解析,具体的执行则是在BE端进行的;而对于catalogd而言,主要的元数据操作都是在FE端通过调用hms的API执行的,BE端主要是进行一些RPC通信。关于这两个模块之间是如何交互的,相关的资料比较少。因此,本文笔者就和大家一起学习下,Impala的BE和FE之间是如何通过JNI进行交互的。

通过Impala进程创建Jvm

如果想在C 调用Java的方法,需要先启动一个Jvm。我们这里以catalogd为例,相关的函数调用如下所示:

代码语言:javascript复制
CatalogdMain(catalogd-main.cc):55
-InitCommonRuntime(init.cc):385
--InitLibhdfs(jni-util.cc):213
---hdfsConnect

可以看到,Impala并不是直接调用JNI的JNI_CreateJavaVM方法来创建Jvm的。而是在InitLibhdfs方法中,通过调用hdfs的API hdfsConnect方法来实现Jvm的创建,我们查看hdfs源码的部分调用栈:

代码语言:javascript复制
//项目地址:https://github.com/apache/hadoop-hdfs
hdfsConnect(hdfs.c):171
-hdfsConnectAsUser(hdfs.c):199
--getJNIEnv(hdfsJniHelper.c):463
---JNI_CreateJavaVM

在hdfs的c 库中,通过调用JNI_CreateJavaVM方法,创建了一个新的Jvm。关于JNI_CreateJavaVM方法的官方解释如下所示:

代码语言:javascript复制
JNI_CreateJavaVM
jint JNI_CreateJavaVM(JavaVM **p_vm, void **p_env, void *vm_args);

Loads and initializes a Java VM. The current thread becomes the main thread. Sets the env argument to the JNI interface pointer of the main thread.

Creation of multiple VMs in a single process is not supported.

可以看到,对于一个进程只能创建一个Jvm。我们沿着上面的Impala调用栈继续往下看:

代码语言:javascript复制
InitCommonRuntime(init.cc):387
-Init(jni-util.cc):133
--GetJNIEnv(jni-util.h):267
---GetJNIEnvSlowPath(jni-util.cc):233
----FindJavaVMOrDie(jni-util.cc):223
-----JNI_GetCreatedJavaVMs

最后在Impala的FindJavaVMOrDie方法中,通过调用JNI的JNI_GetCreatedJavaVMs方法,可以获取本进程已经创建的Jvm,也就是刚刚通过hdfs的API创建的那个Jvm。当我们需要在BE端调用FE端的方法时,就可以通过这个Jvm获取相应的JNIEnv,然后调用相应的API,我们在下面章节会继续讲解具体的调用场景。

调整集群Jvm参数

当我们使用start-impala-cluster.py启动测试集群的时候,脚本就会自动设置环境变量JAVA_TOOL_OPTIONS,在创建Jvm的时候会调用这个环境变量。如下所示:

在build_java_tool_options方法中会构造JAVA_TOOL_OPTIONS变量对应的value,默认是会设置一个DEBUG的调试端口配置,然后返回并添加到环境变量中。同时,脚本还支持参数jvm_args,可以追加一些新的Jvm配置,如下所示:

代码语言:javascript复制
./bin/start-impala-cluster.py '--jvm_args=-Xms10g -Xmx10g'

这样我们就将Jvm的heap size设置成了10g。我们也可以在日志中,看到Jvm相关的配置打印,如下所示:

值得一提的是,除了上面提到的jvm_args可以修改Jvm的参数,还有另外一种方式也可以修改。我们在上一节中提到Jvm是通过hdfs的相关api来创建的。因此,我们还可以通过环境变量LIBHDFS_OPTS来调整Jvm的参数,例如:

代码语言:javascript复制
export LIBHDFS_OPTS="-Xms10g -Xmx10g"

经过测试发现,当使用start-impala-cluster.py启动测试集群的时候,jvm_args参数的优先级要高于LIBHDFS_OPTS环境变量。直接设置JAVA_TOOL_OPTIONS环境变量则不生效,会在start-impala-cluster.py脚本中被覆盖掉。

BE端调用FE端的方法

上面提到,在启动Impala进程的时候,会先创建一个内嵌的Jvm,接着就可以通过这个Jvm获取相应的JNIEnv对象,来加载FE端的相关方法。以catalogd为例,相关的函数调用如下所示:

代码语言:javascript复制
CatalogdMain(catalogd-main.cc):63
-Start(catalog-server.cc):276
--Catalog(catalog.cc):75
---LoadJniMethod(jni-util.cc)

通过LoadJniMethod方法就可以加载FE端的方法。对于catalogd而言,这些方法都位于JniCatalog.java类中,在Catalog的构造函数中进行绑定:

代码语言:javascript复制
//catalog.cc
JniMethodDescriptor methods[] = {
  {"<init>", "([B)V", &catalog_ctor_},
  {"updateCatalog", "([B)[B", &update_metastore_id_},
  {"execDdl", "([B)[B", &exec_ddl_id_},
  {"resetMetadata", "([B)[B", &reset_metadata_id_},
  //省略余下代码

方法加载完成之后,就可以在BE端通过JNI的相关接口进行调用。这里我们以常见的create table为例,这是一个DDL类型的SQL,对于DDL/DML,SQL首先会提交到coordinator节点,最终是由catalogd来执行的,我们将整个流程归纳如下:

主要分为四个步骤,结合上面的图来分别看下每一步的主要逻辑:

  1. 首先,查询会提交到coordinator节点的BE端,通过JNI调用FE端的createExecRequest方法进行SQL解析,由于是一个DDL,所以不需要像普通QUERY一样进行analysis;
  2. 返回到BE端之后,coordinator节点会通过RPC接口与catalogd进行通信,将ExecDdl作为参数传给了DoRpcWithRetry方法;
  3. Catalogd通过JNI调用其FE端的execDdl方法来执行该DDL;
  4. 最终在FE端调用hms的相关接口,来创建一个表,并且更新catalogd自身的内存信息;

到这里为止,一次create table流程就完成了。可以看到,在这个过程中,coordinator和catalogd都通过JNI调用实现了BE和FE之间的交互。

FE端调用BE端的方法

上面介绍了Impala如何在BE端调用FE的方法。其实,在Impala中也存在FE端调用BE方法的场景,这里简单来看一下。在之前的文章:LocalCatalog详解之Coordinator处理流程中,我们介绍了在LocalCatalog模式下,coordinator节点主要是通过CatalogdMetaProvider类来向catalogd获取所需的元数据,即“Fetch-on-demand”,这个过程就涉及到了FE端对BE方法的调用。在调用loadWithCaching方法时,会实现一个Callable对象,重载call方法。在这个call方法中就会涉及到JNI的调用(我们在那篇文章中省略了call方法主体),相关函数调用如下所示:

代码语言:javascript复制
loadTableList(CatalogdMetaProvider.java):667
-call(CatalogdMetaProvider.java):670
--sendRequest(CatalogdMetaProvider.java):400
---GetPartialCatalogObject(FeSupport.java):438/442
----NativeGetPartialCatalogObject(FeSupport.java):111
-----Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(fe-support.cc)

这里的call方法实际是在loadWithCaching函数中调用的,为了方便读者阅读源码,我们就按照它实现的位置进行了展示。当调用BE端的方法之后,相应的流程如下所示:

代码语言:javascript复制
Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(fe-support.cc):593
-GetPartialCatalogObject(catalog-op-executor.cc):370
--DoRpcWithRetry(client-cache.h)
---GetPartialCatalogObject(catalog-service-client-wrapper.h)

可以看到,与上面的create table类似,这里也是通过RPC,将GetPartialCatalogObject作为参数传给了DoRpcWithRetry,最终也是通过catalogd来获取元数据信息,然后返回给coordinator。关于catalogd的处理流程,不是本文关注的重点,这里不再展开说明,后续会在LocalCatalog系列的文章中再详细介绍。值得一提的是,这些在BE端会被FE调用的方法,在编译完成之后,最终都会位于be/build/latest/service/libfesupport.so这个动态库中。

总结

到这里,关于Impala的FE和BE的交互就介绍的差不多了。总结一下,本文首先介绍了Impala是如何在c 进程中来创建Jvm的,接着又介绍了如何调整集群的Jvm参数。最后通过两个场景讲解了FE和BE之间的JNI调用。总之,在当前在大数据系统很多都是Java实现的情况下,Impala这种结合C 和Java的玩法还是比较有意思的,大家可以了解了解。

0 人点赞