使用NiFi每秒处理十亿个事件

2020-04-21 16:22:52 浏览数 (1)

  • 有没有想过Apache NiFi 有多快?
  • 有没有想过NiFi的扩展能力如何?
  • 单个NiFi集群每天可以处理数万亿个事件和PB级数据,并具有完整的数据来源和血缘。这是如何做到的。

当客户希望在生产环境中使用NiFi时,这些通常是第一个提出的问题。他们想知道他们将需要多少硬件,以及NiFi是否可以容纳其数据速率。

这不足为奇。当今世界包含不断增长的数据量。用户需要能够轻松处理这些数据速率的工具。如果企业堆栈中的任何一种工具都无法跟上所需的数据速率,则企业将面临瓶颈,无法阻止其余工具访问所需的数据。

NiFi执行各种任务,并处理所有类型和大小的数据。这使得很难在不完全了解用例的情况下说明需要多少硬件。如果NiFi仅负责将数据从FTP服务器移动到HDFS,则将需要很少的资源。如果NiFi负责从数百个源中提取数据,进行过滤、路由、执行复杂的转换并最终将数据传递到多个不同的目的地,则将需要额外的资源。

幸运的是,后一个问题的答案– NiFi可以扩展到我需要的程度吗?–简单得多。答案几乎总是响亮的“是!” 在本文中,我们定义了一个常见的用例,并演示了NiFi如何在实际数据处理场景中实现高可伸缩性和高性能。

用例

在深入研究数字和统计信息之前,了解用例很重要。理想的用例是一个现实但又足够简单的用例,可以用简洁的方式进行解释。

下面的屏幕快照说明了这种用例。每个处理器被表示用号码:1至8 的可穿行用例,下文中,为了描述每个步骤是如何在数据流来实现的引用这些处理器的数字。

我们在这里介绍的用例如下:

  1. Google Compute Storage(GCS)中存在一个存储桶。 除其他应忽略的无关数据外,该存储桶还包含价值约1.5 TB的NiFi日志数据。
  2. NiFi将监视此存储区[处理器1]。 当数据进入存储桶时,如果文件名包含“ nifi-app”,则NiFi将拉取数据。 [处理器2、3]
  3. 数据可以压缩也可以不压缩。 必须为每个传入的日志文件[处理器4]检测到此错误。 如果已压缩,则必须将其解压缩[处理器5]。
  4. 过滤掉所有日志消息,但日志级别为“ WARN”或“ ERROR”的消息除外[处理器6]。 如果日志消息中包含任何异常,则该异常也必须保留。 另请注意,某些日志消息可能是多行日志消息。
  5. 将日志消息转换为JSON [处理器6]。
  6. 压缩JSON(无论原始输入数据是否已压缩)[处理器7]。
  7. 最后,将WARN和ERROR级别的日志消息(压缩的JSON格式)以及所有堆栈跟踪信息传递到第二个GCS Bucket [处理器8]。 如果将数据推送到GCS失败,则将重试数据直到完成。

这是NiFi非常常见的用例。监视新数据,在可用时进行检索、对其进行路由决策、过滤数据、对其进行转换,最后将数据推送到其最终目的地。

注意RouteOnAttribute Processor [Processor 2]和FetchGCSObject [Processor 3]之间的连接上的图标。此图标表示数据正在整个集群中进行负载平衡。由于GCS Bucket不提供排队机制,因此NiFi负责使数据集群友好。为此,我们仅在单个节点(主节点)上执行列表。然后,我们将该列表分布在整个集群中,并允许集群中的所有节点同时从GCS中提取。这为我们提供了巨大的吞吐量,并且避免了必须在集群中的节点之间对数据进行混洗。

还要注意,我们要确保数据包含WARN和ERROR消息的良好混合,而不仅仅是INFO级别的消息,因为大多数数据流在开始时并未过滤掉绝大多数数据。为此,我们通过故意错误配置某些处理器,使生成日志的NiFi实例不断出错。这导致约20-30%的日志消息为警告或错误并包含堆栈跟踪。平均消息大小约为250字节。

硬件

在讨论任何类型的数据速率之前,重要的是讨论所使用的硬件类型。就我们的目的而言,我们使用实例类型为“ n1-highcpu-32”的Google Kubernetes Engine(GKE)。这样可以为每个节点提供32个内核和28.8 GB的RAM(尽管我们可以用更少的RAM来解决问题,因为我们仅将2 GB的堆用于NiFi JVM)。我们将NiFi的容器限制为26个核,以确保VM中运行的任何其他服务(例如DNS服务和nginx)具有足够的资源来履行其职责。

由于NiFi将数据存储在磁盘上,因此我们还需要考虑拥有的卷的类型。在Kubernetes中运行时,重要的是要确保即使节点丢失,即使将节点移至其他主机,其数据也不会丢失。结果,我们将数据存储在持久性SSD卷上。GKE可以为更大的数量提供更好的吞吐量。因此,我们将单个1 TB的卷用于内容存储库,以确保最佳性能(写入速度为400 MB /秒,读取速度为1,200 MB /秒)。我们将130 GB用于FlowFile存储库和Provenance存储库,因为我们不需要存储太多数据,并且这些存储库不需要与Content Repository一样快。这些卷在同一可用区中提供了内置的冗余。

性能

NiFi在给定时间段内可以处理的数据量在很大程度上取决于硬件,还取决于配置的数据流。对于此流程,我们决定使用几个不同大小的集群来确定将实现哪种数据速率。结果如下所示。

为了真正了解数据速率并比较不同集群大小之间的速率,我们应该考虑在流中的哪个点上我们要观察统计信息,以及哪个统计信息最相关。我们可以看一下流程的最后,看看有多少数据流过,但这不是一个很好的表示,因为所有的数据都已经被过滤掉了(除了WARN和ERROR消息,其他所有数据)。我们可以看一下流程的开始,从GCS那里获取数据,但这并不是一个很好的表示,因为有些数据被压缩而有些没有压缩,因此很难理解正在处理多少数据。

需要考虑的一个更有用的地方是“过滤器日志,转换为JSON”处理器[Processor 6]的输入。该处理器处理的数据量告诉我们集群能够处理的数据总量。此外,我们可以查看此处理器的状态历史记录。这将为我们提供每秒正在处理的记录数。这两个指标都很重要,因此在分析数据速率时我们将同时考虑这两个指标。

查看这些指标,我们可以看到此数据流在几个不同大小的NiFi集群下如何执行。首先,我们将看一个节点:

在这里,我们可以看到单个节点处理了56.41 GB的传入数据。这是5分钟的时间范围。如果将这个数字除以300秒,我们将得到0.18803 GB /秒,或大约192.5 MB /秒。查看状态历史记录,我们可以了解一下每秒的记录(日志消息)数:

在这里,我们看到平均而言,单个节点每5分钟处理283,727,739条记录,或每秒多处理946,000条记录。每秒只有一百万个事件,这对于单个节点而言简直是有点弱!

但是,如果单个节点还不够,我们需要扩展到更多节点怎么办?理想情况下,我们会看到添加更多节点可以使我们线性缩放。如果我们使用5节点集群而不是单节点集群,则会得到如下所示的统计信息:

现在,传入数据速率为每五分钟264.42 GB(0.8814 GB /秒)。以每秒记录数计,我们平均每五分钟大约有14.93亿条记录,或每秒约497万条记录:

进一步扩展,我们可以观察到使用25个节点的集群可实现的性能:

我们看到传入的数据速率每5分钟高达1.71 TB,即5.8 GB /秒。根据每秒的记录,我们显示:

这样一来,每五分钟超过78.2亿条记录的数据速率,或每秒2600万个事件(或每天2.25万亿个事件)的北部。对于25个节点的集群,这相当于每个节点每秒超过100万条记录。

当我们查看状态历史记录时,精明的读者可能会注意到随着时间的流逝,记录读取数的急剧变化。最好用数据的变化来解释。在处理几乎没有错误的文件时,每秒可以看到大量记录。当处理包含堆栈跟踪(更大且需要更多处理)的消息时,我们发现每秒的记录数较少。通过将这些统计数据与“书面记录”的统计数据进行比较,也可以证明这一点:

在这里,我们看到随着读取的记录数减少,写入的记录数增加,反之亦然。因此,我们确保在观察统计信息时,仅考虑同时处理小消息和大消息的时间段。为此,我们选择时间窗口,其中“记录读取数”达到最高点和最低点。然后,我们考虑该时间段内平均读取的记录数。

大多数组织以每秒2600万个事件的速度轻松达到其必需的数据速率。对于那些还没有的组织,随着我们到达更大的集群,NiFi会继续线性扩展吗?

为了找出答案,我们将集群从25个节点增加到100个节点,然后又增加到150个节点。此处显示了150个节点集群的结果:

NiFi在这里以每5分钟9.56 TB(424亿条消息)或32.6 GB /秒(每秒1.413亿个事件)的惊人速度处理数据。相当于每天2.75 PB(12.2万亿个事件)!所有这些都具有详细的出处信息,该信息可以跟踪并显示数据中发生的每个事件。何时何地接收数据;它是如何转变的;以及何时,何地以及确切地发送到其他地方。

下表总结了达到的数据速率,以进行比较:

节点数

数据速率/秒

事件/秒

数据速率/天

活动/天

1

192.5兆字节

946,000

16.6 TB

817亿

5

881兆字节

497万

76 TB

4294亿

25

5.8 GB

2600万

501 TB

2.25万亿

100

22 GB

9000万

1.9 PB

7.8万亿

150

32.6 GB

1.413亿

2.75 PB

12.2万亿

在Google Kubernetes Engine上运行上述流程捕获的数据速率和事件速率。每个节点具有32个内核,15 GB RAM和2 GB堆。内容存储库是1 TB持久性SSD(写入400 MB /秒,读取1200 MB /秒)。

可扩展性

尽管了解系统的性能特征很重要,但是在某个点上,数据速率太高,单个节点无法跟上。结果,我们需要扩展到多个节点。这意味着了解系统的扩展能力也很重要。

我们在上一节中看到,NiFi可以线性地扩展到至少150个节点,但是极限在哪里?可以扩展到250个节点吗?500?1000?如果这些节点比前面提到的32核计算机小得多,该怎么办?在这里,我们着手寻找答案。

为了探索NiFi的扩展能力,我们尝试使用不同大小的虚拟机创建大型集群。在所有情况下,我们都使用具有15 GB RAM的VM。我们还使用了比以前的试用版更小的磁盘,内容存储库使用130 GB的卷,FlowFile存储库使用10 GB的卷,而Provenance存储库使用20 GB的卷。这些较小的磁盘意味着较低的I / O吞吐量,因为较小的磁盘大小会限制IOPS和MB /秒的数量。因此,我们希望具有相同数量节点的集群产生的吞吐量将比上一节中的小得多。

4核虚拟机

我们首先尝试进行横向扩展,以查看NiFi使用非常小的VM(每个只有4个内核)的性能如何。因为每个VM不仅必须承载NiFi,而且还必须承载Kubernetes DNS服务和其他Kubernetes核心服务,所以我们不得不将NiFi容器限制为仅2.5个核心。

一个由150个节点组成的集群可以很好地工作,但是UI表现出明显的滞后。扩展到500个节点意味着严重降低了用户体验,大多数Web请求至少需要5秒钟才能完成。尝试扩展到750个节点会导致集群不稳定,因为节点开始脱离集群。NiFi的“系统诊断”页面显示,集群协调器的1分钟平均负载超过30,只有2.5个内核可用。这意味着要求CPU处理的能力是其处理能力的12倍左右。这种配置(每个VM有4个内核)被认为不足以支持750个节点的集群。

6核虚拟机

接下来,我们尝试扩展6核虚拟机的集群。这次我们能够将容器限制为4.5核,而不是2.5核。这提供了明显更好的结果。一个500节点的集群确实表现出缓慢,但是大多数Web请求在不到3秒的时间内完成了。

扩展到750个节点在UI响应性方面几乎没有什么不同。接下来,我们想尝试一个包含1,000个节点的集群。

实际上,这次我们可以使用6核VM扩展到1,000个节点!集群保持稳定,但是当然,由于这些小型VM和有限的磁盘空间,每个节点上的性能肯定不在每秒一百万个事件的范围内。相反,每个节点的性能在每秒40,000-50,000个事件的范围内:

在此设置中,UI仍然有些呆滞,大多数请求需要2-3秒的时间。

因为我们的核心太少,所以我们还减少了为运行流提供NiFi的线程数量。我们可以看到,节点的利用率并没有太高,在6核VM上,一分钟的平均负载通常为2到4:

但是,问题仍然存在,如果向外扩展到此程度仍会导致线性比例。接下来我们检查了这一点。

12核虚拟机

通过使用12核虚拟机扩展到1,000个节点,我们结束了对NiFi可扩展性的探索。为了确定性能是否线性扩展,我们收集了250个节点,500个节点和1,000个节点的性能指标。同样,这些节点仅包含上一个示例中内核数量的三分之一,并且磁盘速度慢得多,因此此处的性能不应与大型VM的性能相媲美。

在250个节点的情况下,我们看到这些虚拟机以大约4500万个事件/秒(每个节点180,000个事件/秒)处理的事件数:

在500个节点的情况下,我们再次看到以大约9000万个事件/秒(每个节点180,000个事件/秒)处理的事件数:

这大约是我们在32核系统中看到的性能的20%。考虑到节点的核心数为1/3,而内容存储库提供的吞吐量约为32核系统中的吞吐量的1/4,这是非常合理的。这表明NiFi实际上在垂直缩放时也确实线性缩放。

最后,我们将12核VM的集群扩展到1,000个节点。有趣的是,这给我们带来了一个小问题。在具有1,000个节点的集群的情况下,1.5 TB的日志数据被如此快速地处理,以至于我们难以在足够长的时间内保持队列满,无法获得准确的性能度量。要解决此问题,我们在流中添加了DuplicateFlowFile处理器,该处理器将负责为从GCS提取的每个日志文件创建25个副本。这样可以确保我们不会很快耗尽数据。

但是,这有点作弊。这意味着对于96%的数据,我们不会从GCS提取数据,因为数据已经驻留在本地。但是,NiFi仍会处理所有数据。结果,我们希望看到的性能数字比500节点集群的性能数字高出一倍。

可以肯定的是,我们得出的结果是该集群每秒约2.56亿个事件,或每个节点每秒256,000个事件。

汇总

借助NiFi,我们一直秉持的理念是,不仅可以将数据从A点移动到B点,还要考虑有多快。这是关于您改变行为以抓住新机会的速度。这就是为什么我们努力提供如此丰富的用户体验来构建这些数据流的原因。实际上,该数据流仅花费了大约15分钟即可构建,并且可以随时动态更改。但是,由于每个节点每秒记录超过100万条记录,很难不感到兴奋!

NiFi能够线性扩展到至少1,000个节点,而垂直缩放也是线性的。每秒将100万个事件乘以1000个节点。然后考虑我们可以进一步扩展,并且可以确定每个VM可以扩展到96个内核。这意味着单个NiFi集群可以以超过每秒10亿个事件的速度运行此数据流!

在设计任何技术解决方案时,我们需要确保所有工具都能够处理预期的数据量。尽管任何复杂的解决方案都将涉及其他工具,但本文证明,正确调整大小并运行设计良好的流程时,NiFi不太可能成为瓶颈。但是,如果您的数据速率确实超过每秒十亿个事件,我们应该谈谈!

原文来源:https://blog.cloudera.com/benchmarking-nifi-performance-and-scalability/

0 人点赞