快收藏!优化 Apache Flink 应用程序的 7 个技巧!

2022-09-16 14:35:50 浏览数 (1)

在 Shopify 中,我们将Apache Flink作为标准的有状态流媒体引擎,为我们的BFCM Live Map等各种用例提供支持。我们的 Flink 应用程序部署在利用Google Kubernetes Engine的 Kubernetes 环境中。我们的集群采用配置使用高可用性模式,配置任务管理为故障点。我们还为我们使用状态保存器作为我们使用的检查点和点写入谷歌云存储(GCS)。

例如确保Flink应用程序的高性能和弹性是我们的维护任务之一。这也是我们最大的。保持大型有应用程序的弹性很困难。一些数据需要存储巨大的状态,, 13 TB 的销售数据,就像我们在我们的“永远的存储状态:为什么它对您的分析有什么好处”中演讲所分享的)我们在性能调优上付出了很多,上学到了很多教训。

下面将向您介绍 Apache Flink 应用程序的关键课程有哪些方面的介绍。

1. 找到适合的分析工具

手头拥有的分析工具是深入了解如何解决问题的关键。在部署我们的第一个应用程序时,我们发现使用工具集在调试 Flink 时使用正确:

  • Async-profiler:为 Java 虚拟机 (JVM) 用于错误制造任务的分析工具,跟踪事件,包括 CPU 周期、Java 堆分配以及未命中的目标和页面等性能。火焰图的检查支持的管理器将时间花在哪里特别有用。这个工具在帮助我们调试 Kryo 序列化可以产生性能差异方面特别有用。
  • 可视化:另一种工具分析。您可以查看JVM的工具用于运行到运行中工具。它有一个友好的用户界面,它不需要很多设置。
  • jemalloc jeprof:一个通用的 malloc 实现,从 1.12 版本开始被分配器开始使用分配器。jeprof 是与 jemalloc 一起工作的分析器。结合起来,您可以将任务管理和管理设置为自动分配器转储内存配置,然后可以使用内存配置进行分析发现这对于您长时间观察有帮助,可以帮助我们使用 Rock 检测数据库中趋势应用程序的内存泄漏。
  • Eclipse Memory Analyzer ( Eclipse Memory Analyzer : Eclipse MAT 一个 Java 堆分析,用于 JVM 堆转储 MAT 的容量、可用内存泄漏等。它可以用于读取 jemalloc 输出的堆转储,提供GCS文件接收器的内存不足问题时,该工具非常有用,我们将在下面进行。

JVM拥有丰富的分析生态系统,从jmap等基本组成命令到Java Flight Recorder等现代高级功能,这些工具值得研究。

2. 避免 Kryo 序列化

Flink 可能使用它们各自的数据结构提供了不同的序列化器。大多数时候,我们使用 Flink 支持他们开发的 Scala 类或 Avro性能非常好。。

当 Flink 无法使用组合案例类或 Aro 序列化器序列化记录时,它会自动化实现目标化。Kryo 序列化通常很慢,比您使用 async-profile 的其他数据类的工具,您实际上不会注意到这种性能下降。例如,当我们与我们不相关的性能问题时,观察 Kryo 类在使用内存显示占用了多少空间。我们禁止了调试对 Kryo 的回退(env.getConfig().disableGenericTypes();),这会导致各种顺序化导致无法回退。是我们遇到的一些示例以及我们如何修复它们:

  • Scala 的 BigDecimal。Flink不支持序列化 Scala 的 BigDecimal 值,它可以化 Java 值。默认 Java 的 BigDecimal 来避免这种序列化程序失败的实例。当您使用货币处理价值时,您可能会遇到这个问题问题。
  • Scala ADT。 Flink 不支持序列化使用密封特性和一些对象实现的 Scala ADT,通常表示类似枚举的数据结构。但是,它确实支持Scala 枚举,因此您可以使用它们。

在所有这些解决方案中,我们注意到您的问题增加了 20%。

3.根据工作负载率调整配置

例如,在Shopify中,典型的流媒体媒体可能会受到不同的影响,具体而言:

  • 来自时间的消息输入源中可供所有历史零点使用,当前时间(即有回源的需求并开始于当前时间)。
  • 稳定状态:管道正在消耗接近实时的消息,并且源延迟最短(即秒)。
  • 或实时销售活动:管道正在用时接近示例消息,但可能会增加可能会延迟。

让我们关注的两个配置文件,因为它们定义了我们的管道运行模式。在返回期间,积水管道完成其关键任务的大小,而在稳定状态期间,积水压最小。我们现在希望尽快回填上,以减少需要从头重新处理所有数据的任务和代码更改的时间成本。

大数据量的回填消息成为一个新的标准和这几个计算量的工作,对于大型应用程序来说,为什么可能意味着在几个小时内处理我们的问题。以当前的运行预期,我们的预期状态是新鲜度而不是新鲜程度。对于稳定的应用程序以稳定的状态运行并导致当前所有输出的调整。两个不同的配置文件。

您需要考虑您的系统负载率以及它如何影响您的调整,但以下是可以选择的系统因素:系统的负载率配置文件的一些注意事项

  • 源分区(,卡夫卡分区)在稳定状态下,尽可能地压低是最小的。因此,可能会提供这样的一个输入时间段的并行度,并且最小的管道。因此,管道可以输入过多的结果,因此需要输入很多时,请输入重要的资源,请在创建时考虑回填重要的来源。
  • 但是,当下游商的运营速度快时,表现出压力可能导致您的运营背负压力。,在任务中,管道堵塞会明显显示(在作业图为红色)。UI时很确定管道的顺利阶段并完成了它们。
  • 即使您的应用程序代码经过高度优化,可能无法以您希望的速度快速写入接收器。接收器支持许多连接,或者即使它也可能会导致过多的如果在接收器的情况下,扩大接收器的资源(,可能向接收器的更多节点或向卡夫卡添加主题添加其他示例),请考虑减少接收器的并行度或传输不在表上,请考虑减少设备的并行度或传输出的数量连接。
  • 最初 网络发起网络是为了提高资源并执行任务,但现在可以增加更多的渠道,您可以增加更多的任务并提供更多的任务管理权限。但请注意,如果管道图相当并包含多个作品,这通常需要通过大量网络运营。taskmanager.memory.network.fraction
  • 检查点。减少从故障中恢复的时间,在execution.checkpointing.interval状态稳定的检查点频率(可能需要调整任务管理器一堆,以便有足够的内存来上传文件。另外,状态大小很大,请考虑使用增加检查点(state.backend.incremental)。execution.checkpointing.timeout最后,如有需要,请考虑增加点检查( )。

有关可能有用的其他 Flink 部署配置列表的文档,请参阅 Flink 文档。

4. 配置文件堆

Flink 能够提供一个文件接收器,但能够将文件配置为系统对象存储,如 HDFS、S3 或 G 或 G 或 CS(Shopify 使用)。

Flink 的 File Sink 在内存中维护一个分区(或桶)。每个都由 BucketAssigner 提供确定。例如自定义 BucketAssigner 可以使用的列表记录中的任务来生成一个Hive 的图像。一种非常流行的分区格式。date=2021-01-01

我们为它添加了一个真正的文件接收器并将其添加到现有的接收器中的DataStream:

代码语言:javascript复制
val records: DataStream[Record] = … 
val fileSink: SinkFunction[Record] = …
records.addSink(fileSink)

这在测试中很有效,当我们将其部署到真实环境并在测试期间回填问题期间处理所有历史数据时,我们立即将所有可用的 Java 应用程序轻松使用。我们增加了内存,它会崩溃。我们知道缓冲存储桶中的记录可能需要一些内存,但可能需要几个 GB。

在应用程序中要崩溃的时候进行了一堆转储,并使用Eclipse ,我们进行了分析。结果看起来真的很令人快要崩溃:

Eclipse MAT:概述

在上面的显示堆中,你可以清楚地看到两个大树支持占地整个地块。

Eclipse MAT:支配树

在进一步探索堆和应用程序日志后,我们发现了转储记录。由于我们没有应用任何数据重组,所有任务管理器都允许使用可能最终存储在任何存储桶中的存储桶中的存储。

  • 任务管理器都需要在内存中存储大量存储桶。列表我们定期观察超过 500 个。
  • 原因,探测和发现文件的全部时间显着增加:每个任务管理器上都没有数据来快速完成。

我们可以对这个应用程序进行简单的解决方案——只需在将写入接收器之前通过一个字符串记录一个字符串记录:

通过到同一个存储文件中,我们在内存中保存了一个任务管理器的任务管理器,将有更多的任务管理器。内存问题!堆转储分析显示每个任务管理器的活动存储桶数量减少了90%。

如果您有很多日子的数据比日子很快(在进行历史回填时可以预料到其他),您最终可能会出现很大的结果。通过向分区键数小时来更改解决方案以改进可能是此问题的好方法。

数据真实性简单地显示系统是一个很重要的方面,逻辑以并行性的技术也可以在数据接收设备和环境中进行混炼。。

5. 使用 SSD 作为 RocksDB 存储

应用程序RocksDB(美国应用程序状态运行状态)将数据保存在中,但一些手机状态显示在磁盘上,因此需要在巨大的处理器上处理,非常有性能。很明显,一开始特别不是使用Flinks 的时候。例如,我们在部署状态最开始的应用程序(例如,Kafka 消费者刚刚将网络状态卷)时,开始用于 RocksDB 的文件系统(NFS)卷状态NFS。我们没有注意到任何额外的弹性。

但是,网上有很多资源推荐本地等快速磁盘,因此我们尝试将GCP提供的用于我们的状态超过 8 TB 的应用程序。通过使用本地 SSD,我们注意到 SSD I/O 速度的提高同时,如果实例停机,GCP 中的本地 SSD 可能会损坏,保存Flink检查点和点,可以轻松恢复状态

6.避免动态类加载

Flink 有多种方式类以供 Flink 应用程序使用。从调试类加载:

  • Java 类路径: Java 的通用类路径,它包括 JDK 库,以及 Flink 的 /lib 文件夹中的所有代码(Apache Flink 的类和一些依赖项)。
  • Flink 插件组件:__插件代码文件夹位于 /plugins Flink 的文件夹加载中。Flink 的插件机制在启动时会动态一次。
  • 动态用户代码:__这些都包含在动态提交的JAR文件中的所有类(通过REST、CLI、Web UI)。是按作业动态加载(和卸载)的。”

动态用户代码在每个作业开始对时加载,因此存在,并可能会发生类似旧事件的调用。如果 Flink 应用程序需要从暂时性中恢复的时候,它会重新从最新的可用性检查点恢复并重新加载所有动态用户代码。

动态动态类加载之前和之后的元空间内存

我们在这些期间观察到显示器显示。以上面显示“java.langOutMemoryError”的错误形式出现。增加使用的元空间内存量。

通过将上面的程序代码阻止显示 Java 的公共类路径上来禁止动态应用程序类加载,解决了这个问题。修复后的屏幕截图,内存随着重新启动而增加。

该方案适用于应用模式集群,无需支持运行在各个Flink集群上运行多个Flink。

7. 了解 RocksDB 内存使用情况

我们还观察到另一个与内存相关的问题,问题该非常调试,只要我们:

  • 启动了一个有很多状态的 Flink 应用程序
  • 等了至少一个小时
  • 手动终止任务管理器容器之一。

我们将在随后计划更换任务管理器添加到队列中(感谢Kubernetes部署),并在此很快进行应用程序恢复但相反,我们注意到我们的另一个管理器因“内存不足”错误而崩溃,导致崩溃和重启的无休止循环:

出现 OOM 错误的 Flink 容纳的内存使用情况

我们确认问题发生在大量使用且已运行一个小时的应用程序中。我们惊讶地发现“内存不足”错误不仅仅是来自 JVM——async-profiler 和 VisualVM 进行的大量分析没有显示任何问题。增加了某些事件的计算使用内存,并最终计算了 Kubernetes 运行时违反其限制的数量。

jemalloc配置定期将写入写入文件系统,我们可以使用分析。“不足”错误确认之前的一系列配置转储,并与 RocksDB 尝试配置比使用更多的内存:

在这个特定示例中,Flink Managed Memory 配置为使用 5.90 GB,但配置文件明确地正在使用 6.74 GB。

我们有一个支持这一多位相关的 RocksDB 问题:该库发现的用户在过去三年中与我们报告了内存相关的问题。并遵循了问题中的一个建议,使用自定义 RocksDBOptionsFactory 禁止 RocksDB块缓存:

有效!现在,即使在任何杀戮任务管理器到内存之后,我们也没有观察到:

没有 OOM 错误的 Flink 容纳的内存使用情况

禁用 RocksDB 块缓存不会影响性能。实际上,我们只是在缓存中没有什么区别。但是 RocksDB 禁止块缓存的 F 应用程序和使用完整的应用程序之间的性能块缓存看到的 Flink 程序的应用性能。差异。这也解释了为什么我们需要等待等待重新填入:我们正在等待缓存块被缓存满了。我们后来通过启用RocksDB Native Metrics确认了这一点。

你有它!Apache Flink 是一个非常强大的流处理引擎,但是使用它制造的一些复杂的应用程序会带来性能和弹性挑战,需要进行调整和优化工作。我们喜欢这次的旋风之旅,以及我们学到的一些经验教训。

本文为从大数据到人工智能博主「bajiebajie2333」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2109305

0 人点赞