说明:本文分为四个部分内容:背景、Chandy_Lamport算法、Flink Checkpoint对齐机制和总结。
背景
在分布式计算系统中,为了保证数据的一致性需要对数据进行一致性快照。
Flink和Spark在做流式计算时,为了保证数据一致性都借鉴了Chandy-Lamport算法原理,Chandy-Lamport算法目标是让多个分布式节点本地数据以及通信中的数据完成local snapshot本地状态保存最终能一起完成global snapshot保存全局状态。只有了解分布式系统为了保证数据一致性的算法背景,才能更好理解Flink如何用Checkpoint来保证数据Exactly Once准确一次语义和何为barrier对齐。
这篇文章就是对Chandy-Lamport算法核心思想以及Flink CheckPoint机制介绍。
Chandy-Lamport算法
Chandy-Lamport的“快照”算法理论性较强,读者可选前半部分算法核心思想便于理解Flink Checkpoint检查点barrier栏栅对齐过程。本文尽量用结合Flink架构内容让此算法内容更加通俗易懂。
Chandy-Lamport的“快照”算法描述了决定分布式系统全局状态的“快照”算法。该算法的目的是记录进程集Pi(i=1,2,…,N)的进程状态和通道状态集(快照)。这里的进程集类似Flink Jobmanager和TaskManager构成分布式架构的进程集。这样,即使所记录的状态组合可能从没有在同一时间发生,但所记录的全局状态还是一致的。Flink TaskManager多任务可异步完成各自的快照,等所有的快照保存完成通知JobManager来最终保证全局状态一致。此算法本身在进程本地记录状态,它没有给出在一个场地收集全局状态的方法。收集状态的一个简单方法是让所有进程把它们记录的状态发送到一个指定的收集进程,如Flink JobManager中CheckPoint Coordinator检查点协调器类似指定的所有进程的状态收集进程。
算法有如下假设:
- 不论是通道还是进程都不出现故障。通信是可靠的,因此每个发送的消息最终被完整地接收一次
- 通道是单向的,提供FIFO顺序的消息传递。
- 描述进程和通道的图是强连接的(任意两个进程之间有一条路径)
- 任一进程可在任一时间开始一个全局快照。
- 在拍快照时,进程可以继续它们的执行,并发送和接收消息。
对每个进程Pi,设接入通道( incoming channel)是其他进程向Pi发送消息的通道。类似的,Pi的外出通道( outgoing channel)是Pi向其他进程发送消息的通道。
算法基本思想:
每个进程记录它的状态,对每个接入通道还记录发送给它的消息。对每个通道,进程记录在它自己记录下状态之后和在发送方记录下它自己状态之前到达的任何消息。这种安排可以记录不同时间的进程状态并且能用已传送但还没有接收到的消息说明进程状态之间的差别。如果进程Pi已经向进程Pj发送了消息m,但Pj还没有接收到,那么m属于它们之间通道的状态(通信中的状态)
算法使用了特殊的标记( marker)消息,它与进程发送的其他消息不一样,它可在正常执行中发送和接收。在Flink中此标记为barrier栏栅,只是作为分隔符,可简单理解为csv格式以逗号分隔符1,2,3。但是barrier是带有编号的,barrier编号指示Checkpoint触发的批次。
marker标记有两个作用:
- marker标记如果接收者还没有保存自己的状态,那么标记作为提示。
- 作为一种决定哪个消息包括在通道状态中的手段。
算法定义了两个规则:标记接收规则和标记发送规则标记接收规则强制进程在记录下自己的状态之后但在它们发送其他消息之前发送一个标记。
代码语言:javascript复制进程Pi的marker标记接收规则
Pi接收通道c上的marker标记消息:
if(Pi还没有记录它的状态)
Pi记录它的进程状态
将c的状态记成空集;
开始记录从其他接入通道上到达的消息;
else
Pi把c的状态记录成从保存其状态以来它在c上接收到的消息集合
end if
进程p的marker标记发送规则
在Pi记录了其状态之后,对每个外出通道c:
(在Pi从c上发送任何其他消息之前)
Pi在c上发送一个marker标记。
Chandy-Lamport的“快照”算法
标记接收规则强制没有记录状态的进程去记录状态。在这种情况下,这是进程接收到的头一个标记。它记录在其他接入通道上后来收到了哪个消息。当一个已保存状态的进程接收到一个(在另一个通道上的)标记,它就把从它保存其状态以来所接收到的消息集合作为那个通道的状态记录下来。任何进程可以在任何时候开始这个算法。进程好像已接收到一个(在一个不存在的通道上的)标记,并遵循标记接收规则。这样,进程记录它的状态并开始记录在所有接入通道上到达的消息。几个进程可以以这种方式并发地开始记录(只要能区别它们使用的标记)。
快照算法完成,我们假设一个已经接收到一个标记消息的进程在有限的时间里记录了它的状态并在有限的时间里通过每个外出通道发送了标记消息(即使它不再需要在这些通道上发送应用消息)。如果有一条从进程Pi到进程Pj(j≠i)的信道和进程的路径,那么可假设,在Pi记录它的状态之后的有限时间里Pj将记录它的状态。因为我们假设进程和通道图是强连接的,所以在一些进程记录它的初始状态之后的有限时间内,所有的进程将记录它们的状态和接入通道的状态。
Chandy-Lamport的快照算法[1985]按分布的方式收集状态,我们指出了系统中的进程如何把它们收集的状态发送给一个监控进程。下面描述的算法(归功于Marzullo和 Neiger[1991)是集中式的。在Flink中TaskManager作为被监控进程,JobManager作为监控进程,被监控进程将它们的状态发送到一个称为监控器的进程,监控器根据接受到的信息汇总成全局一致状态。监控器在系统之外观察系统的执行。在Flink中,由JobManager来触发Checkpoint,多个TaskManager收到消息后,根据各自情况可异步完成此批次的Checkpoint状态保存,等所有TaskManager都完成了此批次状态保存,才算完成全局一致性的状态快照保存。下面详细介绍Flink Checkpoint机制与barrier对齐过程。
Flink Checkpoint机制
Flink Checkpoint机制是Chandy-Lamport算法的一种变体,称为异步barrier快照。把生成CheckPoint的过程和处理过程分离,这样部分任务保存CheckPoint的过程中,其他任务还可以继续执行,来实现异步保存全局状态快照。
Flink整个系统主要由两个组件组成分别为JobManager和 TaskManager,Flink架构遵循 Master-Slave主从架构设计原则。JobManager为 Master节点, TaskManager为 Worker(Slave)节点有组件之间的通信都是借助于 Akka Framework,包括任务的状态以及 Checkpoint触发等信息。Checkpoint机制由JobManager Coordinator负责。
当Checkpoint Coordinator检查点协调器(JobManager的一部分)指示TaskManager触发Checkpoint检查点时,它会让所有数据流记录其偏移量,对Checkpoint barrier进行编号并插入其数据流中。更详细过程是这样的:
1)JobManager来向TaskManager内的数据源任务如Kafka Source触发检查点Checkpoint流程。
2)数据源算子Operator收到消息后,暂停发出记录(继续接收数据流先缓存),StateBackend状态后端触发生成本地状态快照Checkpoint检查点。
3)本地完成状态快照Checkpoint后,并把带有编号的Checkpoint Barrier广播到数据流分区即多并发的任务(传输给与数据源Source连接的其他算子Operator)。
4)StateBackend状态后端会在状态存为检查点完成后通知JobManager发送确认消息。
5)将所有栏栅Barrier发出后,数据源将恢复正常工作。
这些Barrier流过作业图,标示每个检查点Checkpoint之前和之后的部分流。
数据源任务发出的检查点Barrier栏栅分隔符会传输到与之相连的任务。检查点分隔符Barrier栏栅总是以广播形式发送,从而可确保每个任务能从它们的每个输入都收到一个Barrier栏栅分隔符。当任务收到一个新检查点Barrier栏栅分隔符时,会继续等待所有其他输入分区也发来这个检查点Barrier栏栅分隔符,如任务A完成当前批次编号checkpoint-100的Barrier前数据,但是其他任务还没完成checkpoint-100编号Barrier前的数据,这时任务A又收到checkpoint-101的Barrier前数据先缓存起来暂不处理,在等待过程中,它会继续处理那些从还未提供Barrier栏栅分隔符的分区发来的数据。对于已经提供分隔符的分区,它们新到来的记录会被缓冲起来,不能处理。这个等待所有分隔符到达的过程称为Barrier对齐。
Barrier对齐等待的是所有任务收到同一编号的Barrier栏栅分隔符,并每个任务完成Checkpoint检查点当前Barrier栏栅分隔符之前状态快照保存,才算完成整个分布式系统全局一致性快照保存。同一编号Barrier栏栅分隔符是JobManager定时触发的同一次Checkpoint状态快照机制生成的,Flink Checkpoint时间间隔默认10分钟触发一次。
当Job Graph中的每个算子Operator都收到其中之一Barrier栏栅时,它会记录其状态。具有两个输入流(如CoProcessFunction)的算子执行barrier栏栅对齐,以便快照snapshot将反映由于处理两个输入流中的事件直至(但不超过)两个barrier栏栅而生成的状态。
上述从开始对齐,结束对齐,检查点保存三幅图详细说明一下barrier对齐Checkpoint保存过程,现在如下约定上述数据流数字123456称为输入流A,数据流abcdefg称为输入流B;Checkpoint barrier为当前同一编号的barrier栏栅。
- Begin alignment图:
此时输入流A的Checkpoint barrier已经到Operator即123前barrier,但输入流B的同一编号barrier在d和e中间,即Operator还没处理完输入流B的数据,就也没收到同一编号的barrier,那么Operator就要等待输入流B的barrier到来,这个过程叫等待对齐。
- End alignment图:
当Operator在等待数据流B的barrier时,同时也会继续收到数据记录123先缓存起来不处理,因为123属于barrier左侧(即下一个编号barrier,不属于当前barrier)。当efg前(右侧)同一编号的barrier也到达Operator算子,等待对齐结束。
- Checkpoint图:
对齐结束了,开始做当前批次的Checkpoint状态快照保存,并输出当前编号barrier直到sink结束完成,并通知JobManager Coordinator当前批次Checkpoint完成。
总结
Flink基于异步轻量级的分布式快照技术提供了 Checkpoints容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理。Flink会在输入源的数据集上间隔性地生成Checkpoint barrier,通过栅栏(barrier)将隔时间段内的数据划分到相应的Checkpoint中。当应用出现异常时, Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。对于状态占用空间比较小的应用,快照产生过程非常轻量,高频率创建且对 Flink任务性能影响相对较小。上述主要讲述了对齐Checkpoint分布式一致性快照算法的实现过程,至于Flink1.11新特性非对齐Checkpoint机制有机会再做下次分享。