开源直播课丨大数据集成框架ChunJun类加载器隔离方案探索及实践

2022-10-09 16:00:47 浏览数 (1)

本期我们带大家回顾一下无倦同学的直播分享《ChunJun 类加载器隔离》,ChunJun 类加载器隔离的方案是我们近期探索的一个新方案,这个方案目前还不是非常成熟,希望能借由此次分享与大家一起探讨下这方案,如果大家有一些新的想法欢迎大家在 github 上给我提 issue 或者 pr。

一、Java 类加载器解决类冲突基本思想

在学习方案之前,首先为大家介绍一下 Java 类加载器解决类冲突的基本思想。

01 什么是 Classpath?

Classpath 是 JVM 用到的一个环境变量,它用来指示 JVM 如何搜索 Class。

因为 Java 是编译型语言,源码文件是.java,而编译后的.class 文件才是真正可以被 JVM 执行的字节码。因此,JVM 需要知道,如果要加载一个 com.dtstack.HelloWorld 的类,应该去哪搜索对应的 HelloWorld.class 文件。

所以,Classpath 就是一组目录的集合,它设置的搜索路径与操作系统相关,例如:

在 Windows 系统上,用;分隔,带空格的目录用 "" 括起来,可能长这样:

C:workproject1bin;C:shared;"D:My Documentsproject1bin"

在 MacOS & Linux 系统上,用:分隔,可能长这样:

/usr/shared:/usr/local/bin:/home/wujuan/bin

启动 JVM 时设置 Classpath 变量,实际上就是给 java 命令传入 - Classpath 或 - cp 参数.

java -Classpath .;/Users/lzq/Java/a;/Users/lzq/Java/b com.dtstack.HelloWorld

没有设置系统环境变量,也没有传入 - cp 参数,那么 JVM 默认的 Classpath 为,即当前目录:

java com.dtstack.HelloWorld

02 Jar 包中的类什么时候被加载?

● Jar 包

Jar 包就是 zip 包,只不过后缀名字不同。用于管理分散的 .class 类。

生成 jar 包可以用 zip 命令 zip -r ChunJun.zip ChunJun

java -cp ./ChunJun.zip com.dtstack.HelloWorld

● 加载

“加载”(Loading) 阶段是整个 “类加载”(Class Loading) 过程中的一个阶段,希望读者没有混淆这两个看起来很相似的名词。在加载阶段,Java 虚 拟机需要完成以下三件事情:

1. 通过一个类的全限定名来获取定义此类的二进制字节流;

2. 将这个字节流所代表的静态存储结构转化为方法区的运行时数据结构;

3. 在内存中生成一个代表这个类的 java.lang.Class 对象,作为方法区这个类的各种数据的访问入口。

● 解析

类或接口的解析

假设当前代码所处的类为 D,如果要把一个从未解析过的符号引用 N 解析为一个类或接口 C 的直接引用,那虚拟机完成整个解析的过程需要包括以下 3 个步骤:

1. 如果 C 不是一个数组类型,那虚拟机将会把代表 N 的全限定名传递给 D 的类加载器去加载这个类 C。

在加载过程中,由于元数据验证、字节码验证的需要,又可能触发其他相关类的加载动作,例如加载这个类的父类或实现的接口。一旦这个加载过程出现了任何异常,解析过程就将宣告失败。

2. 如果 C 是一个数组类型,并且数组的元素类型为对象,也就是 N 的描述符会是类

似 “[Ljava/lang/Integer 的形式,那将会按照第一点的规则加载数组元素类型。

如果 N 的描述符如前面所假设的形式,需要加载的元素类型就是 “java.lang.Integer",接着由虚拟机生成一个代表该数组维度和元素的数组对象。

3. 如果上面两步没有出现任何异常,那么 C 在虚拟机中实际上已经成为一个有效的类或接口了,但在解析完成前还要进行符号引用验证,确认 D 是否具备对 C 的访问权限。如果发现不具备访问权限,将抛出 java.lang,llegalAccessEror 异常。

03 哪些行为会触发类的加载?

关于在什么情况下需要开始类加载过程的第一个阶段 “加载”,《Java 虚拟机规范》中并没有进行 强制约束,这点可以交给虚拟机的具体实现来自由把握。但是对于初始化阶段,《Java 虚拟机规范》 则是严格规定了有且只有六种情况必须立即对类进行 “初始化”(而加载、验证、准备自然需要在此之 前开始):

● 场景一

遇到 new、getstatic、putstatic 或 invokestatic 这四条字节码指令时,如果类型没有进行过初始 化,则需要先触发其初始化阶段。能够生成这四条指令的典型 Java 代码场景有:

1. 使用 new 关键字实例化对象的时候。

2. 读取或设置一个类型的静态字段(被 final 修饰、已在编译期把结果放入常量池的静态字段除外) 的时候。

3. 调用一个类型的静态方法的时候。

● 场景二

使用 java.lang.reflect 包的方法对类型进行反射调用的时候,如果类型没有进行过初始化,则需 要先触发其初始化。

● 场景三

当初始化类的时候,如果发现其父类还没有进行过初始化,则需要先触发其父类的初始化。

● 场景四

当虚拟机启动时,用户需要指定一个要执行的主类(包含 main () 方法的那个类),虚拟机会先 初始化这个主类。

● 场景五

当使用 JDK 7 新加入的动态语言支持时,如果一个 java.lang.invoke.MethodHandle 实例最后的解析结果为 REF_getStatic、REF_putStatic、REF_invokeStatic、REF_newInvokeSpecial 四种类型的方法句柄,并且这个方法句柄对应的类没有进行过初始化,则需要先触发其初始化。

●场景六

当一个接口中定义了 JDK 8 新加入的默认方法(被 default 关键字修饰的接口方法)时,如果有这个接口的实现类发生了初始化,那该接口要在其之前被初始化。

对于以上这六种会触发类型进行初始化的场景,《Java 虚拟机规范》中使用了一个非常强烈的限定语 ——“有且只有”,这六种场景中的行为称为对一个类型进行主动引用。除此之外,所有引用类型的方 式都不会触发初始化,称为被动引用。

04 什么是双亲委派机制?

双亲委派机制,是按照加载器的层级关系,逐层进行委派,例如下图中的自定义类加载器想要加载类,它首先不会想要自己去加载,它会通过层级关系逐层进行委派,从自定义类加载器 -> App ClassLoader -> Ext ClassLoader -> BootStrap ClassLoader,如果在 BootStrap ClassLoader 中没有找到想要加载的类,又会逆循环加载。

05 如何打破双亲委派机制?

那么如何打破双亲委派机制呢?其实可以通过重写 loadclass 方法来实现,具体过程大家可通过视频了解,这里就不过多赘述。

二、Flink 类加载隔离的方案

接下来我们来介绍下 Flink 类加载隔离的方案,Flink 有两种类加载器 Parent-First 和 Child-First,他们的区别是:

1.Parent-First

类似 Java 中的双亲委派的类加载机制。Parent First ClassLoader 实际的逻辑就是一个 URL ClassLoader。

2.Child-First

先用 classloader.parent-first-patterns.default 和 classloader.parent-first-patterns.additional 拼接的 list 做匹配,如果类名前缀匹配了,先走双亲委派。否则就用 ChildFirstClassLoader 先加载。

Child-First 存在的问题

每次新 new 一个 ChildFirstClassLoader,如果运行时间久的话,类似 Session 这种 TaskManager 一直不关闭的情况。任务运行多次以后,会出现元数据空间爆掉,导致任务失败。

Child-First 加载原理

01 Flink 是如何避免类泄露的?

大家可以参考 Flink 中的 jira,这里面包含一些 bug 和处理方法:

https://issues.apache.org/jira/browse/FLINK-16245

https://issues.apache.org/jira/browse/FLINK-11205

Flink 如何避免类泄露,主要是通过以下两种方法:

  1. 增加一层委派类加载器,将真正的 UserClassloader 包裹起来。
  2. 增加一个回调钩子,当任务结束的时候可以提供给用户一个接口,去释放未释放的资源。

KinesisProducer 使用了这个钩子

final RuntimeContext ctx = getRuntimeContext();

ctx.registerUserCodeClassLoaderReleaseHookIfAbsent(

KINESIS_PRODUCER_RELEASE_HOOK_NAME,

()-> this.runClassLoaderReleaseHook

(ctx.getUserCodeClassLoader()));

02 Flink 卸载用户代码中动态加载的类

卸载用户代码中动态加载的类,所有涉及动态用户代码类加载(会话)的场景都依赖于再次卸载的类。

类卸载指垃圾回收器发现一个类的对象不再被引用,这时会对该类(相关代码、静态变量、元数据等)进行移除。

当 TaskManager 启动或重启任务时会加载指定任务的代码,除非这些类可以卸载,否则就有可能引起内存泄露,因为更新新版本的类可能会随着时间不断的被加载积累。这种现象经常会引起 OutOfMemoryError: Metaspace 这种典型异常。

类泄漏的常见原因和建议的修复方式:

● Lingering Threads

确保应用代码的函数 /sources/sink 关闭了所有线程。延迟关闭的线程不仅自身消耗资源,同时会因为占据对象引用,从而阻止垃圾回收和类的卸载。

● Interners

避免缓存超出 function/sources/sinks 生命周期的特殊结构中的对象。比如 Guava 的 Interner,或是 Avro 的序列化器中的类或对象。

● JDBC

JDBC 驱动会在用户类加载器之外泄漏引用。为了确保这些类只被加载一次,可以将驱动 JAR 包放在 Flink 的 lib/ 目录下,或者将驱动类通过 classloader-parent-first-patterns-additional 加到父级优先加载类的列表中。

释放用户代码类加载器的钩子(hook)可以帮助卸载动态加载的类,这种钩子在类加载器卸载前执行,通常情况下最好把关闭和卸载资源作为正常函数生命周期操作的一部分(比如典型的 close() 方法)。有些情况下(比如静态字段)最好确定类加载器不再需要后就立即卸载。

释放类加载器的钩子可以通过

RuntimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent() 方法进行注册。

03 Flink 卸载 Classloader 源码

BlobLibraryCacheManager$ResolvedClassLoader

private void runReleaseHooks() {

代码语言:javascript复制
Set<map.entry> hooks = releaseHooks.entrySet();

if (!hooks.isEmpty()) {

    for (Map.EntryhookEntry : hooks) {

        try {

            LOG.debug("Running class loader shutdown hook: {}.", hookEntry.getKey());

            hookEntry.getValue().run();

        } catch (Throwable t) {

            LOG.warn(

                    "Failed to run release hook '{}' for user code class loader.",

                    hookEntry.getValue(),

                    t);

        }

    }

    releaseHooks.clear();

}

}

三、ChunJun 如何实现类加载隔离

接下来为大家介绍下 ChunJun 如何实现类加载隔离。

01 Flink jar 的上传时机

首先我们需要上传 Jar 包,整体流程如下图所示:

● Yarn Perjob

提交任务的时候上传 jar 包,会放到

● Yarn Session

启动 Session 的时候,Yarn 的 App 上传 Jar 包机制,往 Session 提交任务的时候,Flink 的 Blob Server 负责收。

02 Yarn 的分布式缓存

03 Yarn 的分布式缓存

分布式缓存机制是由各个 NM 实现的,主要功能是将应用程序所需的文件资源缓存到本地,以便后续任务的使用。资源缓存是用时触发的,也就是第一个用到该资源的任务触发,后续任务无需再进行缓存,直接使用即可。

根据资源类型和资源可见性,NM 可将资源分成不同类型:

资源可见性分类

● Public

节点上所有的用户都可以共享该资源,只要有一个用户的应用程序将着这些资源缓存到本地,其他所有用户的所有应用程序都可以使用。

● Private

节点上同一用户的所有应用程序共享该资源,只要该用户其中一个应用程序将资源缓存到本地,该用户的所有应用程序都可以使用。

● Application

节点上同一应用程序的所有 Container 共享该资源

资源类型分类

● Archive

归档文件,支持.jar、.zip、.tar.gz、.tgz、.tar 的 5 种归档文件。

● File

普通文件,NM 只是将这类文件下载到本地目录,不做任何处理

● Pattern

以上两种文件的混合体

YARN 是通过比较 resource、type、timestamp 和 pattern 四个字段是否相同来判断两个资源请求是否相同的。如果一个已经被缓存到各个节点上的文件被用户修改了,则下次使用时会自动触发一次缓存更新,以重新从 HDFS 上下载文件。

分布式缓存完成的主要功能是文件下载,涉及大量的磁盘读写,因此整个过程采用了异步并发模型加快文件下载速度,以避免同步模型带来的性能开销。

04 Yarn 的分布式缓存

NodeManager 采用轮询的分配策略将这三类资源存放在 yarn.nodemanager.local-dirs 指定的目录列表中,在每个目录中,资源按照以下方式存放:

● Public 资源

存放在 ${yarn.nodemanager.local-dirs}/filecache/ 目录下,每个资源将单独存放在以一个随机整数命名的目录中,且目录的访问权限均为 0755。

● Private 资源

存放在 ${yarn.nodemanager.local-dirs}/usercache/${user}/filecache/ 目录下,(其中 ${user} 是应用程序提交者,默认情况下均为 NodeManager 启动者),每个资源将单独存放在以一个随机整数命名的目录中,且目录的访问权限均为 0710。

● Application 资源

存放在 ${yarn.nodemanager.local-dirs}/usercache/${user}/${appcache}/${appid}/filecache/ 目录下(其中 ${appid} 是应用程序 ID),每个资源将单独存放在以一个随机整数命名的目录中,且目录的访问权限均为 0710;

其中 Container 的工作目录位于 ${yarn.nodemanager.local-dirs}/usercache/${user}/${appcache}/${appid}/${containerid} 目录下,其主要保存 jar 包文件、字典文件对应的软链接。

05 Flink BlobServer

06 如何快速提交,减少上传 jar 包

Flink libs 下面 jar 包、Flink Plugins 下面的 jar 包、Flink 任务的 jar 包 (对于 ChunJun 来说就是所有 connector 和 core), Flink jar 用户自定义 jar 包。

● Perjob

如果可以提前上传到 HDFS:

  1. 提前把 Flink lib 、Flink plugins、ChunJun jar 上传到 HDFS 上面。
  2. 提交任务的时候通过 yarn.provided.lib.dirs 指定 HDFS 上面的路径即可。

如果不可以提前上传到 HDFS:

  1. 任务提交上传到 HDFS 固定位置,提交的时候检查 HDFS 上如果有对应的 jar (有缓存策略),就把本地路径替换成远程路径。
  2. 利用回调钩子,清楚异常任务结束的垃圾文件。

● Seeion

如果可以提前上传到 HDFS:

  1. 提前把 Flink lib 、Flink plugins、ChunJun jar 上传到 HDFS 上面。
  2. 启动 session 的时候通过 yarn.provided.lib.dirs 指定 HDFS 上面的路径即可。
  3. 提交任务的时候不需要上传 core 包。

如果不可以提前上传到 HDFS:

  1. Session 启动的时候就上传所有 jar 到 HDFS 上面。通过 yarnship 指定。
  2. Flink 任务提交到 Session 的时候,不需要提交任何 jar 包。

07 类加载隔离遇到的问题分析

● 思路分析

  1. 首先要把不同插件 (connector) 放到不同的 Classloader 里面。
  2. 然后使用 child-first 的加载策略。
  3. 确保不会发生 x not cast x 错误。
  4. 元数据空间不会内存泄露,导致任务报错。
  5. 要缓存 connector jar 包。

● 遇到的问题

  1. Flink 一个 job 可能有多个算子,一个 connector 就是一个算子。Flink 原生是为 job 级别新生成的 Classloader,无法把每个 connector 放在一个独立的 Classloader 里面。
  2. child-first 加载策略在 Session 模式下每次都新 new 一个 Classloader,导致元数据空间内存泄露。
  3. connecotor 之间用到公有的类会报错。
  4. 和问题 2 类似,主要是因为有些线程池,守护线程会拿着一些类对象,或者类 class 对象的引用。
  5. 如果用原生 -yarnship 去上传,会放到 App Classloader 里面。那么就会导致某些不期望用 App Classloader 加载的类被加载。

08 Flink JobGraph Classpath 的使用

/** Set of JAR files required to run this job. */

private final ListuserJars = new ArrayList();

/** Set of custom files required to run this job. */

private final MapuserArtifacts = new HashMap<>();

/** List of Classpaths required to run this job. */

private ListClasspaths = Collections.emptyList();

  1. 客户端处理,JobGraph 处理 userJars、userArtifacts、Classpaths 这三个属性。
  2. Classpath 只留下 connector 的层级目录。
  3. 启动 Session 的时候上传 jar,jar 缓存在 Yarn 的所有的 NodeManager 节点。
  4. jobmanager 和 taskmanager 构建 Classloader 的时候去修改 Classpath 的路径,替换成当前节点 NodeManager 的缓存路径。
  5. 根据不同 connecotr 去构建 Flink Job 的 Classloader。
  6. 把构建出来的 classlaoder 进行缓存,下次任务还有相同的 Classloader。避免内存泄露。
  7. 重写新的 ChildFirstCacheClassloader 里面的 loadclass 方法,根据不同的 connector url 去生成 单独的 Classloader。

四、遇到的问题和排查方案?

jar 包冲突常见的异常为找不到类(java.lang.ClassNotFoundException)、找不到具体方法(java.lang.NoSuchMethodError)、字段错误( java.lang.NoSuchFieldError)或者类错误(java.lang.LinkageError)。

● 常见的解决方法如下

1、首先做法是打出工程文件的依赖树,将根据 jar 包依赖情况判定是不是同一个 jar 包依赖了多个版本,如果确认问题所在,直接 exclusion 其中错误的 jar 包即可。

2、如果通过看依赖树不能确定具体冲突的 jar 包,可以使用添加 jvm 参数的方式启动程序,将类加载的具体 jar 信息打印出来;-verbose:class 。

3、经过上述步骤基本就可以解决 jar 包冲突问题,具体的问题要具体分析。

● 常用工具推荐

1.Maven-helper

主要排查类冲突的 IDEA 插件。

2.Jstack

死锁的一些问题可以通过这个工具查看 jstack 调用栈。

3.Arthas

排查一些性能问题和 Classloader 泄露问题。

4.VisualVM

排查一些对象内存泄露、dump 文件分析等。

0 人点赞