概览
在运行 Flink 应用程序时,JVM 会随着时间的推移加载各种类。 这些类可以根据它们的来源分为三组:
- Java Classpath:这是Java的通用类路径,它包括JDK库,以及Flink /lib文件夹中的所有代码(Apache Flink的类和一些依赖)。
- Flink 插件组件:插件代码在 Flink 的 /plugins 文件夹下的文件夹中。 Flink 的插件机制会在启动时动态加载一次。
- 动态用户代码:这些是动态提交的作业的 JAR 文件中包含的所有类(通过 REST、CLI、Web UI)。 它们按作业动态加载(和卸载)。
作为一般规则,无论何时您先启动 Flink 进程然后再提交作业,作业的类都会动态加载。 如果 Flink 进程与作业/应用程序一起启动,或者如果应用程序产生 Flink 组件(JobManager、TaskManager 等),那么所有作业的类都在 Java 类路径中。
插件组件中的代码由每个插件的专用类加载器动态加载一次。
以下是有关不同部署模式的更多详细信息:
Standalone Session
当作为独立会话启动 Flink 集群时,JobManagers 和 TaskManagers 使用 Java 类路径中的 Flink 框架类启动。 针对会话(通过 REST / CLI)提交的所有作业/应用程序中的类都是动态加载的。
Docker / Kubernetes Sessions
Docker / Kubernetes 设置首先启动一组 JobManagers / TaskManagers,然后通过 REST 或 CLI 提交作业/应用程序,其行为类似于独立会话:Flink 的代码位于 Java 类路径中,插件组件和作业代码在启动时动态加载。
YARN
YARN 类加载在单个作业部署和会话之间有所不同:
当直接向 YARN 提交 Flink 作业/应用程序时(通过 bin/flink run -m yarn-cluster …),将为该作业启动专用的 TaskManager 和 JobManager。 这些 JVM 在 Java 类路径中具有用户代码类。 这意味着在这种情况下,作业不涉及动态类加载。
当启动一个 YARN 会话时,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架类启动的。 针对会话提交的所有作业的类都是动态加载的。
反向类加载和类加载器解析顺序
在涉及动态类加载的设置中(插件组件、会话设置中的 Flink 作业),通常有两个类加载器的层次结构:(1)Java 的应用程序类加载器,它包含类路径中的所有类,以及(2)动态插件/ 用户代码类加载器。 用于从插件或用户代码 jar 加载类。 动态 ClassLoader 将应用程序类加载器作为其父级。
默认情况下,Flink 反转类加载顺序,这意味着它首先查看动态类加载器,如果类不是动态加载代码的一部分,则仅查看父类(应用程序类加载器)。
反向类加载的好处是插件和作业可以使用与 Flink 核心本身不同的库版本,这在不同版本的库不兼容时非常有用。 该机制有助于避免常见的依赖冲突错误,如 IllegalAccessError 或 NoSuchMethodError。 代码的不同部分只是具有单独的类副本(Flink 的核心或其依赖项之一可以使用与用户代码或插件代码不同的副本)。 在大多数情况下,这运行良好,不需要用户进行额外配置。
但是,在某些情况下,反向类加载会导致问题(请参阅下文,“X cannot be cast to X”)。 对于用户代码类加载,您可以通过在 Flink 配置中通过 classloader.resolve-order 将 ClassLoader 解析顺序配置为 parent-first(从 Flink 的默认 child-first)来恢复到 Java 的默认模式。
请注意,某些类总是以父级优先的方式解析(首先通过父类加载器),因为它们在 Flink 的核心和插件/用户代码或面向插件/用户代码的 API 之间共享。 这些类的包是通过 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父级优先加载的新包,请设置 classloader.parent-first-patterns-additional 配置选项。
避免用户代码的动态类加载
所有组件(JobManger、TaskManager、Client、ApplicationMaster 等)在启动时记录它们的类路径设置。 它们可以作为日志开头的环境信息的一部分找到。
当运行 JobManager 和 TaskManagers 专用于一项特定作业的设置时,可以将用户代码 JAR 文件直接放入 /lib 文件夹中,以确保它们是类路径的一部分而不是动态加载。
通常将作业的 JAR 文件放入 /lib 目录中。 JAR 将成为类路径(AppClassLoader)和动态类加载器(FlinkUserCodeClassLoader)的一部分。 因为 AppClassLoader 是 FlinkUserCodeClassLoader 的父级(并且 Java 加载父级,默认情况下),这应该导致类只加载一次。
对于无法将作业的 JAR 文件放入 /lib 文件夹的设置(例如因为安装程序是由多个作业使用的会话),仍然可以将公共库放入 /lib 文件夹,并避免动态为那些类进行加载。
用户代码中的手动类加载
在某些情况下,转换函数、源或接收器需要手动加载类(通过反射动态加载)。 为此,它需要能够访问作业类的类加载器。
在这种情况下,函数(或源或接收器)可以成为 RichFunction(例如 RichMapFunction 或 RichWindowFunction)并通过 getRuntimeContext().getUserCodeClassLoader() 访问用户代码类加载器。
X cannot be cast to X exceptions
在使用动态类加载的设置中,您可能会看到 com.foo.X cannot be cast to com.foo.X 样式中的异常。 这意味着 com.foo.X 类的多个版本已被不同的类加载器加载,并且该类的类型试图相互分配。
一个常见的原因是库与 Flink 的反向类加载方法不兼容。 您可以关闭反向类加载来验证这一点(在 Flink 配置中设置 classloader.resolve-order: parent-first)或从反向类加载中排除库(在 Flink 配置中设置 classloader.parent-first-patterns-additional)。
另一个原因可能是缓存对象实例,如 Apache Avro 之类的某些库或通过注册(例如通过 Guava 的 Interners)生成的对象实例。 这里的解决方案是要么在没有任何动态类加载的情况下进行设置,要么确保相应的库完全是动态加载代码的一部分。 后者意味着该库不能被添加到 Flink 的 /lib 文件夹中,而必须是应用程序的 fat-jar/uber-jar 的一部分
卸载用户代码中动态加载的类
所有涉及动态用户代码类加载(会话)的场景都依赖于再次卸载类。 类卸载意味着垃圾收集器发现类中不存在任何对象,因此删除该类(代码、静态变量、元数据等)。
每当 TaskManager 启动(或重新启动)一个任务时,它将加载该特定任务的代码。 除非可以卸载类,否则这将成为内存泄漏,因为加载了新版本的类,并且加载的类总数会随着时间的推移而累积。 这通常通过 OutOfMemoryError: Metaspace 表现出来。
类泄漏的常见原因和建议的修复:
- 延迟线程:确保应用程序功能/源/接收器关闭所有线程。 延迟线程本身会消耗资源,并且通常还会持有对(用户代码)对象的引用,从而防止垃圾收集和类卸载。
- 内部的:避免在超过函数/源/接收器生命周期的特殊结构中缓存对象。 示例是 Guava 的 interners,或序列化程序中 Avro 的类/对象缓存。
- JDBC:JDBC 驱动程序在用户代码类加载器之外泄漏引用。 为了确保这些类只加载一次,您应该将驱动程序 jar 添加到 Flink 的 lib/ 文件夹中,或者通过 classloader.parent-first-patterns-additional 将驱动程序类添加到父级优先加载的类列表中。
卸载动态加载类的一个有用工具是用户代码类加载器释放钩子。 这些是在卸载类加载器之前执行的钩子。 通常建议关闭和卸载资源作为常规函数生命周期的一部分(通常是 close() 方法)。 但在某些情况下(例如对于静态字段),最好在不再需要类加载器时卸载。
类加载器释放钩子可以通过 RuntimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent() 方法注册。
使用 maven-shade-plugin 解决与 Flink 的依赖冲突
从应用程序开发人员的角度解决依赖冲突的一种方法是通过隐藏它们来避免暴露依赖关系。
Apache Maven 提供了 maven-shade-plugin,它允许在编译后更改类的包(因此您编写的代码不受阴影影响)。 例如,如果您的用户代码 jar 中有来自 aws sdk 的 com.amazonaws 包,则 shade 插件会将它们重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代码调用您的 aws sdk 版本。
注意 Flink 的大部分依赖,比如 guava、netty、jackson 等,都被 Flink 的维护者屏蔽掉了,所以用户通常不用担心。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://cloud.tencent.com/developer/article/1936361