Flink的运⾏架构
当 Flink 集群启动后,⾸先会启动⼀个 JobManger 和⼀个或多个的 TaskManager。由 Client 提交任务给
JobManager,JobManager 再调度任务到各个 TaskManager 去执⾏,然后 TaskManager 将⼼跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进⾏数据的传输。上述三者均为独⽴的 JVM 进程。
- Client 为提交 Job 的客户端,可以是运⾏在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
- JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会⽣成优化后的执⾏计划,并以 Task 的单元调度到各个 TaskManager 去执⾏。
- TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动⼀个 Task,Task 为线程。从JobManager 处接收需要部署的 Task,部署启动后,与⾃⼰的上游建⽴ Netty 连接,接收数据并处理。
Flink的作业执⾏流程
以yarn模式Per-job⽅式为例概述作业提交执⾏流程
- 当执⾏executor() 之后,会⾸先在本地client 中将代码转化为可以提交的 JobGraph 如果提交为Per-Job模式,则⾸先需要启动AM, client会⾸先向资源系统申请资源, 在yarn下即为申请container 开启AM, 如果是Session模式的话则不需要这个步骤
- Yarn分配资源, 开启AM
- Client将Job提交给Dispatcher
- Dispatcher 会开启⼀个新的 JobManager线程
- JM 向Flink ⾃⼰的 Resourcemanager申请slot资源来执⾏任务
- RM 向 Yarn申请资源来启动 TaskManger (Session模式跳过此步)
- Yarn 分配 Container 来启动 taskManger (Session模式跳过此步)
- Flink 的 RM 向 TM 申请 slot资源来启动 task
- TM 将待分配的 slot 提供给 JM
- JM 提交 task, TM 会启动新的线程来执⾏任务,开始启动后就可以通过 shuffle模块进⾏ task之间的数据交换
Flink 的 state 是存储在哪⾥的
Apache Flink内部有四种state的存储实现,具体如下:
- 基于内存的HeapStateBackend – 在debug模式使⽤,不 建议在⽣产模式下应⽤;
- 基于HDFS的FsStateBackend – 分布式⽂件持久化,每次读写都产⽣⽹络IO,整体性能不佳;
- 基于RocksDB的RocksDBStateBackend – 本地⽂件 异步HDFS持久化;
Flink 的 window 分类
flink中的窗⼝主要分为3⼤类共5种窗⼝:
- Time Window 时间窗⼝
- Tumbing Time Window 滚动时间窗⼝ 实现统计每⼀分钟(或其他⻓度)窗⼝内 计算的效果
代码语言:txt复制- Sliding Time Window 滑动时间窗⼝ 实现每过xxx时间 统计 xxx时间窗⼝的效果. ⽐如,我们可以每30秒计算⼀次最近⼀分钟⽤户购买的商品总数。
- Count Window 计数窗⼝
- Tumbing Count Window 滚动计数窗⼝ 当我们想要每100个⽤户购买⾏为事件统计购买总数,那么每当窗⼝中填满100个元素了,就会对窗⼝进⾏计算,这种窗⼝我们称之为翻滚计数窗⼝(Tumbling Count Window)
代码语言:txt复制- Sliding Count Window 滑动计数窗⼝ 和Sliding Time Window含义是类似的,例如计算每10个元素计算⼀次最近100个元素的总和
- Session Window 会话窗⼝ 在这种⽤户交互事件流中,我们⾸先想到的是将事件聚合到会话窗⼝中(⼀段⽤户持续活跃的周期),由⾮活跃的间隙分隔开。如上图所示,就是需要计算每个⽤户在活跃期间总共购买的商品数量,如果⽤户30秒没有活动则视为会话断开(假设raw data stream是单个⽤户的购买⾏为流)
Flink 的 window 实现机制
Flink 中定义⼀个窗⼝主要需要以下三个组件。
- Window Assigner:⽤来决定某个元素被分配到哪个/哪些窗⼝中去。
- Trigger:触发器。决定了⼀个窗⼝何时能够被计算或清除,每个窗⼝都会拥有⼀个⾃⼰的Trigger。
- Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗⼝被处理之前,Evictor(如果有Evictor的话)会⽤来剔除窗⼝中不需要的元素,相当于⼀个filter。
Window 的实现
⾸先上图中的组件都位于⼀个算⼦(window operator)中,数据流源源不断地进⼊算⼦,每⼀个到达的元素都会被交给 WindowAssigner。WindowAssigner 会决定元素被放到哪个或哪些窗⼝(window),可能会创建新窗⼝。因为⼀个元素可以被放⼊多个窗⼝中,所以同时存在多个窗⼝是可能的。注意,Window 本身只是⼀个ID标识符,其内部可能存储了⼀些元数据,如TimeWindow 中有开始和结束时间,但是并不会存储窗⼝中的元素。窗⼝中的元素实际存储在 Key/Value State 中,key为Window ,value为元素集合(或聚合值)。为了保证窗⼝的容错性,该实现依赖了 Flink 的 State 机制(参⻅ state ⽂档)。
每⼀个窗⼝都拥有⼀个属于⾃⼰的 Trigger,Trigger上会有定时器,⽤来决定⼀个窗⼝何时能够被计算或清除。每当有元素加⼊到该窗⼝,或者之前注册的定时器超时了,那么Trigger都会被调⽤。Trigger的返回结果可以是continue(不做任何操作),fire(处理窗⼝数据),purge(移除窗⼝和窗⼝中的数据),或者 fire purge。⼀个Trigger的调⽤结果只是fire的话,那么会计算窗⼝并保留窗⼝原样,也就是说窗⼝中的数据仍然保留不变,等待下次Trigger fire的时候再次执⾏计算。⼀个窗⼝可以被重复计算多次知道它被 purge 了。在purge之前,窗⼝会⼀直占⽤着内存。
当Trigger fire了,窗⼝中的元素集合就会交给Evictor (如果指定了的话)。Evictor 主要⽤来遍历窗⼝中的元素列表,并决定最先进⼊窗⼝的多少个元素需要被移除。剩余的元素会交给⽤户指定的函数进⾏窗⼝的计算。如果没有 Evictor 的话,窗⼝中的所有元素会⼀起交给函数进⾏计算。
计算函数收到了窗⼝的元素(可能经过了 Evictor 的过滤),并计算出窗⼝的结果值,并发送给下游。窗⼝的结果值可以是⼀个也可以是多个。DataStream API 上可以接收不同类型的计算函数,包括预定义的sum() , min() , max() ,还有 ReduceFunction , FoldFunction ,还有WindowFunction 。WindowFunction是最通⽤的计算函数,其他的预定义的函数基本都是基于该函数实现的。
Flink 对于⼀些聚合类的窗⼝计算(如sum,min)做了优化,因为聚合类的计算不需要将窗⼝中的所有数据都保存下来,只需要保存⼀个result值就可以了。每个进⼊窗⼝的元素都会执⾏⼀次聚合函数并修改result值。这样可以⼤⼤降低内存的消耗并提升性能。但是如果⽤户定义了 Evictor,则不会启⽤对聚合窗⼝的优化,因为 Evictor 需要遍历窗⼝中的所有元素,必须要将窗⼝中所有元素都存下来。
Flink具体是如何实现exactly once 语义
状态 Exactly-Once
Flink 提供 exactly-once 的状态(state)投递语义,这为有状态的(stateful)计算提供了准确性保证。也就是状态是不会重复使⽤的,有且仅有⼀次消费
这⾥需要注意的⼀点是如何理解state语义的exactly-once,并不是说在flink中的所有事件均只会处理⼀次,⽽是所有的事件所影响⽣成的state只有作⽤⼀次.
在上图中, 假设每两条消息做⼀次checkPoint操作,持久化⼀次state. TaskManager在处理完 event c 之后被shutdown, 这时候当 JobManager重启task之后, TaskManager 会从 checkpoint 1 处恢复状态,重新执⾏流处理,也就是说 此时 event c 事件 的的确确是会被再⼀次处理的. 那么 这⾥所说的⼀致性语义是何意思呢? 本身,flink每处理完⼀条数据都会记录当前进度到 state中, 也就是说在故障前, 处理完 event c 这件事情已经记录到了state中,但是,由于在checkPoint 2 之前, 就已经发⽣了宕机,那么 event c 对于state的影响并没有被记录下来,对于整个flink内部系统来说就好像没有发⽣过⼀样, 在故障恢复后, 当触发 checkpoint 2 时, event c 的 state才最终被保存下来. 所以说,可以这样理解, 进⼊flink 系统中的事件永远只会被⼀次state记录并checkpoint下来,⽽state是永远不会发⽣重复被消费的, 这也就是 flink内部的⼀致性语义,就叫做 状态 Exactly once.
端到端(end-to-end)Exactly-Once
2017年12⽉份发布的Apache Flink 1.4版本,引进了⼀个重要的特性:TwoPhaseCommitSinkFunction.,它抽取了两阶段提交协议的公共部分,使得构建端到端Excatly-Once的Flink程序变为了可能。这些外部系统包括Kafka0.11及以上的版本,以及⼀些其他的数据输⼊(data sources)和数据接收(data sink)。它提供了⼀个抽象层,需要⽤户⾃⼰⼿动去实现Exactly-Once语义.
为了提供端到端Exactly-Once语义,除了Flink应⽤程序本身的状态,Flink写⼊的外部存储也需要满⾜这个语义。也就是说,这些外部系统必须提供提交或者回滚的⽅法,然后通过Flink的checkpoint来协调
flink是如何实现反压的
flink的反压经历了两个发展阶段,分别是基于TCP的反压(<1.5)和基于credit的反压(>1.5)
基于 TCP 的反压
flink中的消息发送通过RS(ResultPartition),消息接收通过IC(InputGate),两者的数据都是以LocalBufferPool的形式来存储和提取,进⼀步的依托于Netty的NetworkBufferPool,之后更底层的便是依托于TCP的滑动窗⼝机制,当IC端的buffer池满了之后,两个task之间的滑动窗⼝⼤⼩便为0,此时RS端便⽆法再发送数据
基于TCP的反压最⼤的问题是会造成整个TaskManager端的反压,所有的task都会受到影响
基于 Credit 的反压
RS与IC之间通过backlog和credit来确定双⽅可以发送和接受的数据量的⼤⼩以提前感知,⽽不是通过TCP滑动窗⼝的形式来确定buffer的⼤⼩之后再进⾏反压
flink中的时间概念 , eventTime 和 processTime的区别
Flink中有三种时间概念,分别是 Processing Time、Event Time 和 Ingestion Time
- Processing Time Processing Time 是指事件被处理时机器的系统时间。 当流程序在 Processing Time 上运⾏时,所有基于时间的操作(如时间窗⼝)将使⽤当时机器的系统时间。每⼩时 Processing Time 窗⼝将包括在系统时钟指示整个⼩时之间到达特定操作的所有事件
- Event Time Event Time 是事件发⽣的时间,⼀般就是数据本身携带的时间。这个时间通常是在事件到达 Flink 之前就确定的,并且可以从每个事件中获取到事件时间戳。在 Event Time 中,时间取决于数据,⽽跟其他没什么关系。Event Time 程序必须指定如何⽣成 Event Time ⽔印,这是表示 Event Time 进度的机制
- Ingestion Time Ingestion Time 是事件进⼊ Flink 的时间。 在源操作处,每个事件将源的当前时间作为时间戳,并且基于时间的操作(如时间窗⼝)会利⽤这个时间戳 Ingestion Time 在概念上位于 Event Time 和 Processing Time 之间。 与 Processing Time 相⽐,它稍微贵⼀些,但结果更可预测。因为 Ingestion Time 使⽤稳定的时间戳(在源处分配⼀次),所以对事件的不同窗⼝操作将引⽤相同的时间戳,⽽在 Processing Time 中,每个窗⼝操作符可以将事件分配给不同的窗⼝(基于机器系统时间和到达延迟) 与 Event Time 相⽐,Ingestion Time 程序⽆法处理任何⽆序事件或延迟数据,但程序不必指定如何⽣成⽔印
flink中的session Window怎样使
会话窗⼝主要是将某段时间内活跃度较⾼的数据聚合成⼀个窗⼝进⾏计算,窗⼝的触发条件是 Session Gap, 是指在规定的时间内如果没有数据活跃接⼊,则认为窗⼝结束,然后触发窗⼝结果
Session Windows窗⼝类型⽐较适合⾮连续性数据处理或周期性产⽣数据的场景,根据⽤户在线上某段时间内的活跃度对⽤户⾏为进⾏数据统计
代码语言:javascript复制val sessionWindowStream = inputStream
.keyBy(_.id)
//使⽤EventTimeSessionWindow 定义 Event Time 滚动窗⼝
.window(EventTimeSessionWindow.withGap(Time.milliseconds(10)))
.process(......)
Session Window 本质上没有固定的起⽌时间点,因此底层计算逻辑和Tumbling窗⼝及Sliding 窗⼝有⼀定的区别。
Session Window 为每个进⼊的数据都创建了⼀个窗⼝,最后再将距离窗⼝Session Gap 最近的窗⼝进⾏合并,然后计算窗⼝结果。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://cloud.tencent.com/developer/article/1955285