【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查

2022-03-31 11:00:37 浏览数 (1)

问题简介

公司线上一个Flink作业的State Size随时间逐渐增大,运行一段时间后出现报OutOfMemory异常。

作业代码是之前同事写的,所以,我了尽可能少分析少修改之前代码的原则,直接从问题现象本身挖掘尽可能多的有效信息。

问题现场

1. 问题表象分析

从Flink web ui上观察作业的checkpoint历史信息,每隔一段时间抽取出来作业所有算子的checkpoint metrics信息:

可以看到,除了第一的state比较稳定外,其他operator算子的state size始终是单调递增的,没有任何收敛的趋势。

分析程序,第一个算子是addSource(source),数据源是消息队列,所以只记录offset之类的消费信息,这种state需要的空间复杂度为常数,所以保持474字节不变。

2. 按图索骥,循序渐进

运用JDK工具分析是哪些JVM空间没有被释放。

(1) 由于这个Flink作业只用了一个TaskManager,所以,我们只需要观察这个TaskManager的JVM进程即可。从Flink ui上记录TaskManager所在的物理节点。

(2) 从yarn的All Applications ui上查看这个Flink作业的yarn作业ID,端口号默认是8088。

(3) 到TaskManager所在节点,用yarn作业ID获取TaskManager的进程号:

代码语言:javascript复制
jps -lvm | grep application_161948172732_0530

jps:查看本地运行着的java程序,并显示他们的进程号。

代码语言:javascript复制
-l:用于输出运行主类的全名,如果是 jar 包,则输出 jar 包的路径;
-m:用于输出虚拟机启动时传递给主类 main() 方法的参数;
-v:用于输出启动时的 JVM 参数;
-q:用于输出 LVMID(Local Virtual Machine Identifier,虚拟机唯一 ID);

(4) jmap -dump打印堆dump文件。

分析时可以用:jvisualvm、Eclipse memory analyzer(jmat)等进行分析:查看线程快照、数量最多的实例是哪些、类的成员变量的值等,

代码语言:javascript复制
>jmap -dump:live,format=b,file=./heap.hprof

dump文件可能比较大,建议用gzip,它是个功能很强大的压缩命令,特别是我们可以设置 -1 ~ -9 来指定它的压缩级别,数据越大压缩比率越大,耗时也就越长,推荐使用 -6~7。

jmap:用于查询堆的快照信息。

代码语言:javascript复制
>jmap -heap {pid}
运行时内存属性:内存的配置参数(Heap Configuration)及使用状态(Heap Usage)

>jmap -dump:live,format=b,file=/path/heap.hprof {pid}
查看线程快照、数量最多的实例是哪些、类的成员变量的值。
打印堆dump文件,分析时可以用:jvisualvm、Eclipse memory analyzer(jmat)等进行分析。

问题定位


用jvisualvm分析dump。

查看堆内存对象占用空间情况,找到top3的实例对象,发现是程序中的某类实例类,如下,是LoadAddrOutPut和LoadEmpNameOutPut。

再查看这些实体类对象的成员变量值的eventtime,分析都是哪些事件时间的实体类对象没有被释放。双击LoadAddrOutPut即可进入该类实例对象的查看界面:

发现存在很多天之前的事件,而程序中的window窗口大小为1天。

至此,初步结论是:window窗口中本应过期的数据没有释放。那么,再从程序中查看有valuestate的StateTtlConfig,但是却没有设置清除策略!

问题解决


Flink的过期数据的清理。

1. 默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:

代码语言:javascript复制
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

2. 全量快照时进行清理

另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。该策略可以通过 StateTtlConfig 配置进行配置:

代码语言:javascript复制
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

注意: 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。

3. 增量数据清理

另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

代码语言:javascript复制
import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();

该策略有两个参数。第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
4. 在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

代码语言:javascript复制
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig. newBuilder(...). cleanupInRocksdbCompactFilter (long queryTimeAfterNumEntries) 方法指定处理状态的条数。时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j. logger. org. rocksdb. FlinkCompactionFilter = DEBUG

0 人点赞