【Flink】第三十三篇: 任务线程模型

2022-03-31 11:26:24 浏览数 (1)

源码系列推荐:

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

【Flink】第二十六篇:源码角度分析Task执行过程

【Flink】第二十八篇:Flink SQL 与 Apache Calcite

【Flink】第二十九篇:源码分析 Blink Planner

线程模型能帮助我们更深刻的理解Flink任务执行原理,更精确的控制Flink程序,这些是使用Flink解决复杂问题、写出高性能和高可用程序的基础。

例如,在运用DataStream API完成业务需求时,可以更精确的把控Function中每个field、state field的作用范围及其完整的生命周期,也可以帮助我们进一步思考线程并发、线程安全、线程同步等问题。

梗概

本文从JobMaster向TaskManager,通过RPC远程过程调用的方式,提交一次Task为分析线索,以线程调度为主题,对Flink这一过程的源码进行分析。

先对分析的结论进行简单阐述,以防止读者迷路。

1. 远端JobMaster通过TaskExecutor# submitTask,向其提交Task

2. 在TaskExecutor# submitTask中,

(1) 首先,完成Task的实例化,注意在实例化Task的过程中为其构造方法中提供了一个重要的新线程

(2) 然后,调用TaskExecutor# startTaskThread,运行Task中为其初始化的新线程,来运行task

在 (1) 中,完成Task实例化时,为其生成了一个新的Thread,这个Thread就是Mailbox线程模型的执行线程。

在 (2) 中,线程首先会去按照提交的StreamTask,进行反序列化生成相应的StreamTask,而在实例化StreamTaskl的过程中,将当前线程传递给了StreamTask,并进一步传递了TaskMailbox,并调用StreamTask# invoke,执行MailboxProcessor# runMailboxLoop,进入Mailbox的单线程循环执行模式。

Mailbox模型

Mailbox线程模型,它是Actor模型的理念,简单来说,本质就是一种生产者-消费者模型

包含一个阻塞队列和一个执行线程,有多个消费者,即任何线程都可以通过同步互斥的方式向阻塞队列(即Box)中添加Task(及Mail),而消费者只有一个线程,消费MailBox中的Mail并线程安全的执行它。

笔者在阅读源码的过程中发现Flink多处都用了这种模型,例如,akka、Flink的RPC等均是Mailbox模型。而且,这个模型从 Flink 1.9 开始实现,之前通过一个全局锁(checkpoint lock)来保证线程安全。

这部分推荐阅读:https://blog.csdn.net/yuchuanchen/article/details/105677408

源码阅读

TaskExecutor

TaskExecutor其实就是TaskManager的最外层封装了,而TaskExecutor又是通过TaskExecutorGateway来抽象所有的与TaskExecutor交互的Gateway,这种交互是基于RPC的。

为什么说TaskExecutor就是taskmanager的本身了?

TaskExecutor的注释:TaskExecutor负责执行多个Task。即是说taskmanager被分配了虚拟化的资源槽:taskslot,而这就是taskslot就可以被分配运行task。

在TaskExecutor中,我们最关心的是submitTask:

在其中就有JM从远程进行任务调度后进行Task初始化的代码:

生成Task实例后,便是调度其进行异步执行:

我们依然沿着任务调度,线程模型的线索,先来看看Task的实例化过程。

Task

Task的注释中非常重要的一句话,描述了Mailbox的影子:

Each Task is run by one dedicated thread. // 每个任务由一个专用线程运行。

并且所有为Task创建的Task的线程都从属于"Flink Task Threads"

这也是我们调试Flink源码时,发现Task线程的从属线程组都是Flink Task Threads:

接着我们在Task中寻找这个线程:

接着在构造方法中看到对它的初始化逻辑:

就是在这里,为这个Task的运行提供了一个新的线程,并且指定了线程组、Runnable逻辑、线程名称。

至此,完成了Task实例的初始化。第二个重要的操作便是执行这个task,即调用Task# startTaskThread:

Task# startTaskThrea仅仅是简单的调用了Thread# start,而Runnable我们也刚刚分析过了,就是刚刚初始化的Task实例,所以Task实例里的run方法就是线程执行的逻辑,run中又执行了doRun,doRun便是Task的核心执行逻辑,

在源码中可以看到,先通过反射的方式对具体的Task的逻辑进行了加载,然后便是调用其的invoke进行执行。对于我们用户定义的普通Stream代码,这里的invokable实例就是StreamTask。

StreamTask

这里我们要特别注意StreamTask和StreamOperator 的关系,以下是StreamTask的注释的翻译:

所有流式传输任务的基类。任务是由 TaskManager 部署和执行的本地处理单元。每个任务运行一个或多个StreamOperator ,它们形成了任务的操作员链。链接在一起的运算符在同一个线程中同步执行,因此在同一个流分区上。这些链的常见情况是连续的 map/flatmap/filter 任务。

所以,StreamTask包含一个或多个Operator,而例如连续的 map/flatmap/filter会形成operator-chain,便交给一个StreamTask执行。而StreamTask又是和线程对应的。

我们关心的是线程模型,所以相关的field是:

以及两个构造方法:

这里便可以看到Mailbox的身影了,用当前线程构造了TaskMailboxImpl实例,即StreamTask的子类。

又用这个mailbox构造了两个同样关键的实例:mailboxProcessor、mainMailboxExecutor。

在被Task# doRun中调用了StreamTask# invoke后,接着再来看看invoke:

很容易的看出, 核心的执行方法是runMailboxLoop,

而StreamTask# runMailboxLoop调用了MailboxProcessor# runMailboxLoop,所以我们来到MailboxProcessor的runMailboxLoop一探究竟,

MailboxProcessor

我们最关心的依然是mailbox的传递,这个mailbox成员便是我们刚刚在StreamTask的构造方法里传递给MailboxProcessor的传参,接着继续看调用方法,

所以,任务线程是在一个循环中,不断的从Mailbox中取出Mail,然后执行,这也和我们在Mailbox部分的介绍是一致的,源码及笔者的注释如下,

TaskMailboxImpl

顺着这个思路,我们继续来看看TaskMailbox,核心的代码及注释如下图,

而消费的方法tryTake如下,

主要是先对当前线程进行check,如果当前线程不是唯一的消费者线程,不允许消费。

接着是执行Mail:

底层是StreamTaskActionExecutor用一个mutex互斥同步监视器实现互斥调用:

那么Task主要包含哪些类别?

总结

至此,我们通过源码分析了,TaskManager是如何接收JobManager调度给它的Task,并且又是如何创建执行线程,通过构造方法一步一步传递给了Mailbox线程模型,完成单消费者线程安全的执行各类Stream消息,即Mail。

0 人点赞