前言
•本来打算写一个flink源码分析的系列文章,但由于事情太多,又不太想输出低质量的文章,所以开始看一些好的flink相关博客,本文译自https://www.ververica.com/blog/apache-flink-at-mediamath-rescaling-stateful-applications ;•flink中state的划分和介绍;•flink 中operator state在什么时候会进行rescale以及如何进行rescale?;•flink 中keyed state的when and how?。
有状态流处理的介绍
在较高的层次上,我们可以把流处理中的state看作是operators中的内存,这些operators记住关于过去输入的信息,并可以用来影响未来输入的处理。
相比之下,无状态流处理中的operators只考虑它们当前的输入,而没有进一步的上下文和关于过去的记录。举一个简单的例子来说明这个区别:让我们考虑一个源流,它发出模式为e = {event_id:int, event_value:int}的事件。我们的目标是,对于每个事件,提取并输出event_value。我们可以通过简单的source-map-sink管道轻松实现这一点,其中map函数从事件中提取event_value并将其下游发送到输出sink。这是一个无状态流处理的实例。
但是,如果我们想修改作业,只在event_value大于前一个事件的值时才输出该怎么办?在本例中,我们的map函数显然需要某种方法来记住过去事件的event_value——因此这是一个有状态流处理的实例。
这个例子应该说明状态是流处理中的一个基本概念,大多数有趣的用例都需要这个概念。
Apache Flink中的state
Apache Flink是一个大规模并行分布式系统,它允许大规模的有状态流处理。为了实现可伸缩性,Flink作业在逻辑上分解为operators图,每个operators的执行在物理上分解为多个并行operator实例。从概念上讲,Flink中的每个并行operator实例都是一个独立的任务,可以在自己的机器上调度,这个机器位于一个网络连接的无共享机器集群中。
在此设置中,为了实现高吞吐量和低延迟,必须最小化各任务之间的网络通信。在Flink中,用于流处理的网络通信仅沿着作业operator图的逻辑边缘(垂直)发生,因此流数据可以从上游operator转移到下游operator。
然而,一个operator的并行实例之间不存在通信(横向)。为了避免这样的网络通信,数据本地化是Flink中的一个关键原则,它强烈地影响着状态的存储和访问方式。
出于数据本地化的考虑,Flink中的所有状态数据总是绑定到运行相应并行operator实例的任务,并位于运行该任务的同一台机器上。
通过这种设计,任务的所有状态数据都是本地的,并且状态访问不需要任务之间的网络通信。对于像Flink这样的大规模并行分布式系统的可伸缩性来说,避免这种通信是至关重要的。
对于Flink的有状态流处理,我们区分了两种不同类型的状态:operator state和keyed state。Operator state的作用域是一个operator(子任务)的每个并行实例,keyed state可以被认为是“已分区或分片的operator state,每个键只有一个状态-分区”。我们可以很容易地将前面的示例实现为operator state:通过operator实例路由的所有事件都可以影响其值。
重新缩放有状态流处理作业
在无状态流中更改并行性(即更改operator执行工作的并行子任务的数量)非常容易。它只需要启动或停止无状态operators的并行实例,并断开它们与上下游operators的连接,如图1A所示。
另一方面,改变有状态operators的并行性要复杂得多,因为我们还必须(i)以(ii)一致的、(iii)有意义的方式重新分配之前的operator state。请记住,在Flink的无共享架构中,所有state都是运行拥有并行operator实例的任务的本地state,并且在作业运行时并行operator实例之间不进行通信。
然而,Flink中已经有一种机制允许以一致的方式在任务之间交换operator state,并且保证只交换一次——Flink检查点(checkpoint)!
您可以在文档中看到关于Flink检查点的详细信息。简而言之,当检查点协调器将一个特殊事件(所谓的checkpoint barrier)注入流中时,就会触发检查点。
Checkpoint barriers随着事件流从数据源流向sink,当一个operator实例收到barrier时,该operator实例会立即将其当前状态快照到一个分布式存储系统,例如HDFS。
在恢复时,作业的新任务(现在可能在不同的机器上运行)可以再次从分布式存储系统获取状态数据。
我们可以在检查点上对有状态作业进行重新伸缩处理(rescale),如图1B所示。首先,触发检查点并发送到分布式存储系统。接下来,以更改了的并行度重新启动作业,并可以从分布式存储中访问之前所有状态的一致性快照。虽然这解决了(i)跨机器重新分配一致的状态的问题,但仍然存在一个问题:在以前的状态和新的并行operator实例之间没有明确的1:1关系,我们如何以(iii)有意义的方式分配状态?
我们可以再次将以前的map_1和map_2的状态分配给新的map_1和map_2。但这将使map_3处于空状态。根据状态类型和任务的具体语义,这种简单的方法可能会导致效率低下或不正确的结果。
在下一节中,我们将解释如何解决Flink中高效、有意义的状态重分配问题。Flink state有两种类型:operator state和keyed state,每种类型都需要不同的状态分配方法。
在缩放时重新分配operator state
首先,我们将讨论在缩放中如何对operator state进行状态重分配。在Flink中,一个常见的实际用例是维护Kafka源中Kafka分区的当前偏移量。每个Kafka源实例将维护<PartitionID, Offset>对-一对Kafka分区的源正在读取的-作为operator state。在缩放的情况下,我们如何重新分配这个operator state?理想情况下,我们希望在重新调整后,在所有并行操作符实例中,在轮循中重新分配所有从检查点中获取的<PartitionID, Offset>对。
作为一个用户,我们知道Kafka分区偏移量的意义,我们知道我们可以把它们作为独立的,可重新分配的状态单位。我们如何与Flink共享这些特定领域的概念仍然是一个问题。
图2A说明了Flink中检查点operator状态的前面的接口。在快照上,每个operator实例返回一个表示其完整状态的对象。对于Kafka源,该对象是分区偏移量的列表。
然后将该快照对象写入分布式存储。在恢复时,从分布式存储中读取对象,并将其作为参数传递给operator实例,以供restore function使用。
这种方法在缩放时存在问题:Flink如何将operator状态分解为有意义的、可重新分发的分区?即使Kafka源实际上总是一个分区偏移量的列表,之前返回的状态对象对于Flink来说是一个黑盒子,因此不能被重新分配。
作为解决这个黑盒问题的通用方法,我们稍微修改了检查点接口,称为ListCheckpointed。图2B显示了新的检查点接口,它接收和返回状态分区列表。引入列表而不是单个对象使得能显式地对状态进行有意义的分区:列表中的每个项对于Flink来说仍然是一个黑盒,但被认为是operator状态的一个原子的、独立的可重新分发的部分。
我们的方法提供了一个简单的API,实现operator可以用它来编码关于如何划分和合并状态单元的领域特定知识。有了我们新的检查点接口,Kafka源代码可以显式地显示各个分区的偏移量,状态重分配变得像拆分和合并列表一样简单。
代码语言:javascript复制public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> implements CheckpointedFunction { // ...
private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsOperatorState;
@Override public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore(); // register the state with the backend this.offsetsOperatorState = stateStore.getSerializableListState("kafka-offsets");
// if the job was restarted, we set the restored offsets if (context.isRestored()) { for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsOperatorState.get()) { // ... restore logic } } }
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.offsetsOperatorState.clear();
// write the partition offsets to the list of operator states for (Map.Entry<KafkaTopicPartition, Long> partition : this.subscribedPartitionOffsets.entrySet()) { this.offsetsOperatorState.add(Tuple2.of(partition.getKey(), partition.getValue())); } }
// ...
}
Reassigning Keyed State When Rescaling
Flink中的第二种状态是 keyed state。与操作符状态不同, keyed state的作用域是键,键是从每个流事件中提取的。
为了说明 keyed state与operator state的区别,让我们使用下面的示例。假设我们有一个事件流,其中每个事件都有模式{customer_id:int, value:int}。我们已经知道,我们可以使用operator state来计算和发出所有客户值的运行和。
现在假设我们想稍微修改我们的目标,并计算每个customer_id的值的运行和。这是一个来自keyed state的用例,因为必须为流中的每个唯一键维护一个聚合状态。
注意,keyed state仅对通过Flink中的keyBy()操作创建的keyed流可用。keyBy()操作(i)指定如何从每个事件中提取一个键,(ii)确保具有相同键的所有事件总是由相同的并行operator实例处理。因此,所有keyed state都会往下传递,它也被绑定到一个并行operator实例,因为对于每个键,只有一个operator实例负责。从key到operator的映射是通过对key进行哈希分区确定地计算出来的。
我们可以看到,在进行缩放时,keyed state比operator state有一个明显的优势:我们可以很容易地找出如何在并行operator实例之间正确地拆分和重新分配状态。状态重新分配只是在keyed stream的分区之后进行的。在重新缩放之后,每个key的state必须分配给现在负责该key的operator实例,这由keyed stream的hash分区决定。
虽然这自动解决了重新缩放后从逻辑上将状态重新映射到子任务的问题,但还有一个更实际的问题需要解决:我们如何有效地将状态转移到子任务的local backends?
当我们不进行缩放时,每个子任务可以简单地读取前面一个实例在一次连续读取中写入检查点的整个状态。
但是,当重新缩放时,这就不可能了——每个子任务的状态现在都可能分散在所有子任务所写的文件中(想想如果您在hash(key) mod parallelism中更改parallelism,会发生什么情况)。我们在图3A中说明了这个问题。在这个例子中,我们展示了当一个键空间为0,20的并行度从3调整到4时,键是如何被打乱的,使用identity作为hash函数来让这个过程更易于理解。
一种简单的方法可能是从所有子任务中的检查点读取所有前面的子任务状态,并过滤出与每个子任务的匹配键。虽然这种方法可以从顺序读模式中受益,但每个子任务都可能读取大量不相关的状态数据,分布式文件系统接收大量并行读请求。
另一种方法是建立一个索引,跟踪检查点中每个键的状态位置。通过这种方法,所有子任务都可以非常有选择性地定位和读取匹配的键。这种方法可以避免读取不相关的数据,但它有两个主要缺点。所有键的物化索引,即key到读offset的映射,可能会增长得非常大。此外,这种方法还会引入大量的随机I/O(当寻找单个键的数据时,见图3A,这通常会导致分布式文件系统的性能非常差。
Flink的方法介于这两个极端之间,它引入key-groups作为状态分配的原子单位。这是如何运作的呢?key-groups的数量必须在作业启动之前确定,并且(目前)在作业启动之后不能更改。由于key-groups是状态分配的原子单位,这也意味着key-groups的数量是并行性的上限。简而言之,key-groups为我们提供了一种在缩放灵活性(通过设置并行度上限)和索引和恢复状态所涉及的最大开销之间进行交换的方法。
我们将key-groups作为分配给子任务的范围。这使得对restore的读取不仅在每个key-groups内是连续的,而且常常跨多个key-groups。另一个好处是:这也使密钥组到子任务分配的元数据非常小。我们不显式地维护key-groups列表,因为跟踪范围边界就足够了。
我们在图3B中演示了使用10个key-groups将并行度从3调整到4。正如我们所看到的,引入key-groups并将它们作为范围(range)进行分配大大改进了访问模式。图3B中的方程2和3也详细说明了我们如何计算key-groups和范围分配。
结束
通过本文,我们希望您现在对可伸缩状态在Apache Flink中如何工作以及如何在真实场景中利用可伸缩有了一个清晰的认识。