问题简介
公司线上一个Flink作业的State Size随时间逐渐增大,运行一段时间后出现报OutOfMemory异常。
作业代码是之前同事写的,所以,我了尽可能少分析少修改之前代码的原则,直接从问题现象本身挖掘尽可能多的有效信息。
问题现场
1. 问题表象分析
从Flink web ui上观察作业的checkpoint历史信息,每隔一段时间抽取出来作业所有算子的checkpoint metrics信息:
可以看到,除了第一的state比较稳定外,其他operator算子的state size始终是单调递增的,没有任何收敛的趋势。
分析程序,第一个算子是addSource(source),数据源是消息队列,所以只记录offset之类的消费信息,这种state需要的空间复杂度为常数,所以保持474字节不变。
2. 按图索骥,循序渐进
运用JDK工具分析是哪些JVM空间没有被释放。
(1) 由于这个Flink作业只用了一个TaskManager,所以,我们只需要观察这个TaskManager的JVM进程即可。从Flink ui上记录TaskManager所在的物理节点。
(2) 从yarn的All Applications ui上查看这个Flink作业的yarn作业ID,端口号默认是8088。
(3) 到TaskManager所在节点,用yarn作业ID获取TaskManager的进程号:
代码语言:javascript复制jps -lvm | grep application_161948172732_0530
jps:查看本地运行着的java程序,并显示他们的进程号。
代码语言:javascript复制-l:用于输出运行主类的全名,如果是 jar 包,则输出 jar 包的路径;
-m:用于输出虚拟机启动时传递给主类 main() 方法的参数;
-v:用于输出启动时的 JVM 参数;
-q:用于输出 LVMID(Local Virtual Machine Identifier,虚拟机唯一 ID);
(4) jmap -dump打印堆dump文件。
分析时可以用:jvisualvm、Eclipse memory analyzer(jmat)等进行分析:查看线程快照、数量最多的实例是哪些、类的成员变量的值等,
代码语言:javascript复制>jmap -dump:live,format=b,file=./heap.hprof
dump文件可能比较大,建议用gzip,它是个功能很强大的压缩命令,特别是我们可以设置 -1 ~ -9 来指定它的压缩级别,数据越大压缩比率越大,耗时也就越长,推荐使用 -6~7。
jmap:用于查询堆的快照信息。
代码语言:javascript复制>jmap -heap {pid}
运行时内存属性:内存的配置参数(Heap Configuration)及使用状态(Heap Usage)
>jmap -dump:live,format=b,file=/path/heap.hprof {pid}
查看线程快照、数量最多的实例是哪些、类的成员变量的值。
打印堆dump文件,分析时可以用:jvisualvm、Eclipse memory analyzer(jmat)等进行分析。
问题定位
用jvisualvm分析dump。
查看堆内存对象占用空间情况,找到top3的实例对象,发现是程序中的某类实例类,如下,是LoadAddrOutPut和LoadEmpNameOutPut。
再查看这些实体类对象的成员变量值的eventtime,分析都是哪些事件时间的实体类对象没有被释放。双击LoadAddrOutPut即可进入该类实例对象的查看界面:
发现存在很多天之前的事件,而程序中的window窗口大小为1天。
至此,初步结论是:window窗口中本应过期的数据没有释放。那么,再从程序中查看有valuestate的StateTtlConfig,但是却没有设置清除策略!
问题解决
Flink的过期数据的清理。
1. 默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:
代码语言:javascript复制import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。
2. 全量快照时进行清理
另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。该策略可以通过 StateTtlConfig 配置进行配置:
代码语言:javascript复制import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
注意: 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。
3. 增量数据清理
另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
代码语言:javascript复制import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally(10, true)
.build();
该策略有两个参数。第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
注意:
- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- 增量清理会增加数据处理的耗时。
- 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
4. 在 RocksDB 压缩时清理
如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
代码语言:javascript复制import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig. newBuilder(...). cleanupInRocksdbCompactFilter (long queryTimeAfterNumEntries) 方法指定处理状态的条数。时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j. logger. org. rocksdb. FlinkCompactionFilter = DEBUG