Provenance Repository
在Provenance存储库中存储每个FlowFile的历史记录。此历史记录用于提供每个数据的数据沿袭(也称为产销监管链)。每次为FlowFile发生事件(创建,分叉,克隆,修改FlowFile等)时,都会创建一个新的Provenance事件。这个出处事件是流文件的快照,因为它看起来就是在那个时间点存在的流。创建Provenance事件后,它将复制所有FlowFile的属性和指向FlowFile内容的指针,并将其与FlowFile的状态(例如其与其他出处事件的关系)聚合到Provenance存储库里。该快照将不会更改,直到过期。根据“nifi.properties”文件中的指定,Provenance存储库将在完成后的一段时间内保留所有这些来源事件。
因为所有流文件属性和指向内容的指针都保存在Provenance存储库中,所以数据流管理器不仅能够查看该数据段的沿袭或处理历史,而且能够在以后查看数据本身,甚至从流中的任何点重放数据。一个常见的用例是当一个特定的下游系统声称没有收到数据时。数据沿袭可以准确地显示数据何时传递到下游系统、数据的外观、文件名以及数据发送到的URL,或者可以确认数据确实从未发送过。在这两种情况下,都可以通过单击按钮(或通过访问适当的http api)来重播Send事件,以便仅向特定的下游系统重新发送数据。或者,如果数据处理不当(可能应该先进行一些数据操作),则可以修复流,然后将数据重放到新流中,以便正确处理数据。
不过,请记住,由于Provenance并不是复制content Repo中的内容,而只是复制FlowFile指向该内容的指针,因此可以在删除引用该内容的Provenance事件之前删除该内容。这意味着用户以后将无法再看到内容或重放流文件。但是,用户仍然能够查看流文件的沿袭并了解数据发生了什么。例如,即使数据本身无法访问,用户仍然能够看到数据的唯一标识符、文件名(如果适用)、何时接收、从何处接收、如何操作、发送到何处等等。此外,由于FlowFile的属性是可用的,因此Dataflow管理器能够理解为什么数据按原来的方式处理,从而为理解和调试数据流提供了一个关键工具。
由于Provenance是流文件的快照,因为它存在于当前流中,因此对流的更改可能会影响以后重播源事件的能力。例如,如果从流中删除了连接,则无法从流中的该点重放数据,因为现在没有地方将数据排队等待处理。
Provenance Log Files
每个Provenance都有两个map,一个用于事件之前的attributes,另一个用于更新后的attributes。一般来说,Provenance事件不存储属性的更新值,因为它们在发出事件时就存在,而是在提交会话时存储属性值(session.commit()
)。事件被缓存并保存,直到会话被提交为止,一旦会话被提交,当会话被提交时,事件将与流文件相关联的属性一起发出。此规则的例外是“SEND”事件,在这种情况下,事件包含的属性与事件发出时的属性相同。这样做是因为,如果还发送了属性本身,那么准确地知道发送了什么信息就很重要。
在运行NiFi时,会有16个Provenance日志文件的滚动组。发出事件源时,它们将被写入16个文件之一(有多个文件可提高吞吐量)。日志文件会定期滚动(默认时间范围是每30秒一次)。这意味着新创建的Provenance事件将开始写入由16个日志文件组成的新组,并且原始文件将被处理以进行长期存储。首先,将经过滚动的日志合并到一个文件中。然后,可以选择对文件进行压缩(由nifi.provenance.repository.compress.on.rollover
属性确定)。最后,使用Lucene
对事件进行索引并使其可用于查询。这种分批编制索引的方法意味着无法立即提供Provenance事件以进行查询,但是作为回报,这大大提高了性能,因为提交事务和建立索引是非常昂贵的任务。
一个单独的线程负责处理出处日志的删除。管理员可以设置两个条件来控制出处日志的删除,即可以占用的最大磁盘空间量和日志的最大保留期限。该线程按上次修改日期对存储库进行排序,并在超过其中一个条件时删除最旧的文件。
Provenance存储库使用了Lucene索引,分为多个碎片。这样做有多种原因。首先,Lucene使用32位整数作为文档标识符,因此限制了Lucene不分片支持的最大文档数量。其次,如果我们知道每个分片的时间范围,则可以轻松地使用多个线程进行搜索。而且,这种分片还允许更有效的删除。NiFi会等到计划删除某个分片中的所有事件,然后再从磁盘删除整个分片。这使得删除时我们不必更新Lucene索引。
Updating Repository
- 为了支持高吞吐量,允许指定多个Container。允许同时写入多个磁盘分区以提高吞吐量。
- 每个容器支持多个journals。
- 允许多个线程同时更新存储库。更新存储库时,我们在分区之间循环。
- 每个容器有多个日志,因为我们要内联序列化数据。如果我们仅对每个磁盘分区写入单个日志,那么我们将无法充分利用磁盘,因为从对象到字节的序列化非常昂贵。
- 我们自己对数据进行编码。这不仅使我们能够根据需要更改架构,而且还避免了将Provenance Event转换为中间数据结构(例如Avro Record)的开销,这样就可以将其序列化到磁盘上,然后执行反序列化时也是一样。
- 经过一段可配置的时间段(默认为30秒)后,我们将所有journals合并到一个
Provenance Event Log File
中。发生这种情况时,我们会滚动日志,以便其他线程可以同时更新存储库。 - 滚动journals时,我们将压缩数据并为其编制索引。
- 我们不会在写入数据时对其进行压缩,因为这样做会降低吞吐量。
- 如果在写入时关机或掉电,则在写入压缩文件时,数据可能无法恢复。
- 我们不会在写入数据时编制索引,因为这样做会降低吞吐量。
- 在压缩数据时,我们会跟踪压缩块索引。我们将1 MB的数据写入GZIP流,然后增加压缩块索引。同时,我们将压缩块索引的
.toc
(目录表)文件保留为“压缩块偏移”的映射。此偏移量是此事件块开始的文件中的偏移量。这样,当我们为事件建立索引时,我们就可以为相关字段以及数据指针建立索引。指向数据的指针是数据存储在其中的源事件日志文件,事件ID和压缩块偏移量。结果,如果我们有一个Provenance Event Log File
,即压缩后为1 GB,并且想要从中获取特定记录,我们可以简单地查找到块偏移量(例如980,028,872),然后用GZIPInputStream包装FileInputStream。然后从那里开始阅读。我们将最多只能读取1 MB的(解压缩)数据。这使我们可以非常快速地访问这些记录。 - 写入每条记录后,然后将其与指向数据的指针一起放在队列中。然后,一个单独的线程将从队列中提取此信息,并在Lucene中对数据进行索引。我们这样做是为了让我们可以允许多个线程一次对数据进行索引,因为索引的计算量很大,而且实际上是处理过程中NiFi的瓶颈大量的数据记录。
- 当所有数据均已写入合并的
Provenance Event Log File
(压缩的事件日志文件),进行压缩并建立索引后,我们将删除原始日记文件。 - 当我们在Lucene中建立数据索引时,我们会“分片” Lucene索引,以使它们不会超出某些可配置的空间量(默认为500 MB)。
- Lucene存储的文档ID是32位整数,而不是64位整数。结果,它最多可以包含约20亿条记录。
- 我们能够跨多个磁盘分区对索引本身进行分条。
- 当多个线程正在更新特定索引时,对该索引的访问非常慢。这样,我们就可以避免在不需要时触摸该索引。
- 存储Lucene索引的目录的文件名是创建索引的时间戳。这使我们能够准确知道何时需要搜索哪些索引在某些指定的时间范围内查询数据。
Recovering After Restart
- 我们寻找任何journal文件。如果存在匹配的
Provenance Event Log File
(相关性基于文件名),那么我们知道重新启动时我们正在对索引文件进行索引和合并,因此我们需要完成该工作。我们无法轻易知道我们从何处中断,因此我们只需要删除Provenance Event Log File
并删除该事件文件的索引中的任何记录即可。然后,我们重新开始合并文件建索引。 - 我们确定任何journal文件中的最大事件ID,或者如果没有journal文件,则确定
Provenance Event Log File
中的最大事件ID。 - 然后,将ID生成器设置为此值加1。这样可以确保所有事件始终具有唯一的一个编号。这一点很重要,因此当我们拥有“块偏移”和“事件ID”时,我们便知道要寻找的事件。还使我们能够轻松地顺序访问事件。
Retrieving Events Sequentially
- Provenance存储库的原始实现旨在简单地存储事件,并允许以后通过(顺序)ID检索事件,以便可以将事件发布到其他地方。
- 因为我们在滚动时将journals合并到单个
Provenance Event Log File
中,所以我们能够顺序写入事件。 - 命名
Provenance Event Log File
的名称应使文件名反映文件中第一个事件的事件ID。 - 这意味着我们可以请求一个特定的事件ID,并确切地知道它在哪个文件中,因为我们无需查找该事件ID即可找到名称最大的文件。
- 然后,我们确定该事件ID所需的压缩块偏移量。这是通过查看上面提到的目录文件确定的。
- 至此,我们确切知道哪个文件包含该事件以及该文件要查找的位置。我们寻找到这个位置,打开一个GZIPInputStream,然后开始阅读。
- API使开发人员可以请求特定的事件ID开始并返回事件数。这种设计使我们可以按顺序读取并将这些事件返回给调用方。
Expire Data
- 为了避免用完存储空间,我们必须最终淘汰这些数据。
- 用户可以指定存储容量的大小限制以及时间限制。
- 后台线程定期运行,检查存储容量。它将确定应销毁哪些数据并将其标记为销毁。
- 首先淘汰最旧的数据。我们可以根据文件名轻松确定哪个数据最旧,因为那代表一个不断增加的单向数字。
- 将文件标记为要销毁时,将保留文件的大小,因此我们会根据需要将尽可能多的文件标记为要销毁,以便降至最大容量的90%以下。
- 如果尚未达到存储容量,我们将检查任何
Provenance Event Log File
是否早于配置的最大时间限制。如果是这样,我们会将其标记为销毁。 - 然后,我们删除所有标记为要销毁的文件。
- 删除文件后,我们将更新索引以删除任何指向该
Provenance Event Log File
的事件。