Flink 资源分配和并行度深度剖析

2021-03-05 14:32:42 浏览数 (1)

一、简介

TaskManager 执行具体的 Task。TaskManager 为了对资源进行隔离和增加允许的task数,引入了 slot 的概念,这个 slot 对资源的隔离仅仅是对内存进行隔离,策略是均分,比如 taskmanager 的管理内存是 3 GB,假如有两个 slot,那么每个 slot 就仅仅有 1.5 GB 内存可用。

Client 这个角色主要是为 job 提交做些准备工作,比如构建 jobgraph 提交到 jobmanager ,提交完了可以立即退出,当然也可以用client 来监控进度。

Jobmanager 和 TaskManager 之间通信类似于 Spark 的早期版本,采用的是 actor 系统。如下图

img

二、什么是 Task

在spark中:

RDD中的一个分区对应一个task,task是单个分区上最小的处理流程单元。

被送到某个Executor上的工作单元,多个Task组成一个TaskSet。

上述引入 spark 的 task 主要是想带着大家搞明白,以下几个概念:

  • Flink的并行度由什么决定的?
  • Flink的task是什么?

flink 并行度很好解释:Flink 每个算子都可以设置并行度,然后也可以设置全局并行度。

Api的设置

代码语言:javascript复制
.map(new RollingAdditionMapper()).setParallelism(10)

全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:可以设置默认值大一点

三、FIink 算子连接模式

上下游算子通过数据流进行连接,有 one-to-one (or forwarding) pattern 和 redistributing pattern 两种模式:

  1. One-to-one :保留元素的分区和排序。这意味上下游运算符将获取到相同顺序元素。
  2. Redistributing :更改流的分区。每个运算符都将数据发送到不同的目标子任务,具体取决于所选的转换。比如 keyBy()(其重新分区通过散列键),broadcast(), or rebalance()(其重新分区随机地)。在重新分配交换中,元素之间的顺序仅保留在每对发送和接收子任务中

四、Flink 的 task 和 subtask 如何划分?

  1. task: 是没有产生 shuffle,One-to-one 模式下算子的集合,里面封装了分区数个 subTask,类似 spark 中的 TaskSet。
  2. subTask:flink 最小的执行单元,task 每一个分区会形成一个 subTask ,类似 spark 中的 task。 读起来有点抽象,下面我们用flink官方案例说明一下,案例代码如下

img

案例执行DAG图

img

说明:图中假设是 source/map 的并行度都是 2,keyby/window/apply 的并行度也都是 2,sink 的是 1,那么有几个 task,几个subTask 呢?

答案:共 task 有 3 个,subTask 是五个,最终需要五个线程。

解释:由于 source 到 map 没有产生 shuffle ,并且并行度相同,属于 One-to-one 的模式,所有 source 和 map 划分成一个 task,后面的 map 到 keyBy ,和最后的 sink 都有 shuffle 产生,并行度发生改变,所有 keyBy,sink 都是一个单独的 task,所有共有 3 个task,其中 source,map 并行度是 2,所以有两个 subTask,以此类推共有 5 个 subtask。

五、如何在 flink 的 ui 界面上查看任务的 task 和 subTask

如下图我们点击任务的详情页面,右上角的 4 就是 task 总数,DAG 中的每一个矩形代表一个独立的 task,点击每一个 task 详情,我们能看到 task 的 subtask 信息,包括 subtask 的接受数据量,状态等信息,对于任务调优有极大的帮助。

img

img

六、Operator Chains

默认情况下,flink 不同的 task 的 subTask,允许任务共享 slot,当然,前提是必须在同一个 job 内部,且不自定义 slotgroup。Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。

每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

我们仍以经典的 WordCount 为例,下面这幅图,展示了 Source 并行度为 1,FlatMap、KeyAggregation、Sink并行度均为 2,最终以 5 个并行的线程来执行的优化过程。

img

上图中将 KeyAggregation 和 Sink 两个 operator 进行了合并,因为这两个合并后并不会改变整体的拓扑结构。

但是,并不是任意两个 operator 就能 chain 一起的。其条件还是很苛刻的:

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间数据分区方式是 forward
  7. 用户没有禁用 chain

七、slotgroup

为了防止同一个 slot 包含太多的 task,或者我们希望把计算逻辑复杂的算子单独使用 slot ,提高计算速度,Flink 提供了资源组(group) 的概念。

group 就是对 operator 进行分组,同一 group 的不同 operator task 可以共享同一个 slot。默认所有 operator 属于同一个组"default",也就是所有 operator task 可以共享一个 slot。

我们可以通过 slotSharingGroup() 为不同的 operator 设置不同的group

代码语言:javascript复制
dataStream.filter(...).slotSharingGroup("groupName");

Flink通过 SlotSharingGroup 和 CoLocationGroup 来决定哪些 task 需要被共享,哪些 task 需要被单独的 slot 使用

八、SlotSharingGroup

表示不同的 task 可以共享 slot,但是这是 soft 的约束,即也可以不在一个 slot,默认情况下,整个 StreamGraph 都会用一个默认的 “default” SlotSharingGroup,即所有的 JobVertex 的 task 都可以共用一个 slot

注意: 1.默认设置上游算子设置了SlotSharingGroup,下游的算子也会集成上一个算子使用相同的,SlotSharingGroup。 2.slot隔离不会涉及到CPU的隔离,slot目前仅仅用来隔离task的内存

九、slot 和 parallelism

1. slot

是指 taskmanager 的并发执行能力,在 hadoop 1.x 版本中也有 slot 的概念,有兴趣的读者可以了解一下。

img

taskmanager.numberOfTaskSlots:3

每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlos

2. parallelism 是指 taskmanager 实际使用的并发能力

parallelism.default:1

运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。

3.parallelism是可配置、可指定的

1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度

2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度

3.可以通过设置executionEnvironmentk的方法修改并行度

4.可以通过设置flink的编程API修改过并行度

5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.

6.设置合适的并行度,能提高运算效率

7.parallelism不能多与slot个数。

十、slot 和 parallelism 总结

  1. slot 是静态的概念,是指 taskmanager 具有的并发执行能力
  2. parallelism 是动态的概念,是指程序运行时实际使用的并发能力
  3. 设置合适的 parallelism 能提高运算效率,太多了和太少了都不行
  4. 设置 parallelism 有多中方式,优先级为 api > env > p > file

资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去 GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下载 https://gitee.com/li_hey_hey/dashboard/projects

代码语言:javascript复制

-End-

代码语言:javascript复制

0 人点赞