一种工作流心跳机制的设计

2022-07-19 13:53:40 浏览数 (2)

最近工作中一直和 SWF(Amazon 的 Simple Work Flow)打交道,在一个基于 SWF 的工作流框架上面开发和修 bug。SWF 的 activity 超时时间是 5 分钟,在 activity task 开始执行以后,activity worker 需要主动发送心跳请求告知 service 端:“我还活着,我还在干活”,如果出现超过 5 分钟(可以配置)没有心跳,SWF 的 service 端就认为,你已经挂了,我需要把这个 activity 安排到别的 activity worker 上来执行了。借用 AWS 官网的一张图:

每台机器上有若干个 activity task 在被执行。可以看到,在 activity 任务启动起来以后,需要用不断的心跳来告知 service 端任务还在进行,activity worker 还活着。这个 “汇报” 需要 activity worker 所在的 host 主动进行,这也是 SWF 的 service 端无状态(几年前写过一点东西介绍它)的基本要求之一。任务都是由 worker 端去 pull 的,这些行为也都是 worker 端主动触发的。

这个机制描述起来很简单,但是实际在相关设计实现的时候,有许多有趣和值得琢磨的地方。

在我手头的这个 workflow 里面,心跳机制是这样实现的:

  • 有两个 queue,一个是 main queue,是 dequeue(双端队列);另一个是 backup queue,普通队列。二者都是用来存放需要发送心跳的 activity 信息(heartbeatable 对象)。
  • 每秒钟都尝试执行这样一个方法 A:从 main queue 里面 poll 一个 heartbeatable 对象(如果 queue 为空就忽略本次执行),检查该心跳所代表的 activity task 是否还在工作,如果是,就发送一个心跳。发送成功以后,就把这个 heartbeatable 对象扔到 backup queue 里面去。这样,一秒一个,逐渐地,main queue 的 heartbeatable 对象全部慢慢被转移到 backup queue 里去了。
  • 每隔两分钟(称为一个 cycle)执行方法 B:把 backup queue 里面所有的 heartbeatable 对象全部转移到 main queue 里去,于是就又可以继续执行上面一步的逐个心跳逻辑。

这个机制的基本好处是,所有 activity task 的心跳统一管理,通常情况下保证了心跳不会过快(默认配置下是一秒一个,或者不发送),同时保证了没有谁会被遗漏:

但是,这里又会浮现好多好多问题:

为什么要使用两个 queue?

首先,有这样一个事实:方法 A 在执行的时候,理论上每秒钟会执行一次,但是这里并没有强制的保证,使得前一秒的 A 执行一定会在这一秒的 A 开始之前完成。换言之,它们的理论启动时间是按序的,但是实际启动时间和实际的心跳执行时间是不定的,需要处理并发的情形。而到底最多可能存在多少个执行 A 的线程并行,取决于用于此心跳功能的线程池的配置。因此,在执行和判断的过程中,需要对当前 poll 出来的 heartbeatable 对象加锁。

使用两个 queue,这主要是为了记录在本次 cycle 里面,能够很容易判断某一个 heartbeatable 对象是否已经完成心跳行为。还没有完成心跳的,都在 main queue 里;完成了的,都放到 backup queue 里。

如果使用一个 queue,那么也是有解决方案的:

  • 有一个公共计数器,每个 cycle 开始的时候,给计数器 1。
  • 每一个 heartbeatable 对象自身需要携带一个私有计数器,用以标识当前这轮 cycle 的心跳是否已经完成。
  • 每次完成的 heartbeatable 对象给自己的计数器 1 以后扔到队尾;每次 A 取新的 heartbeatable 对象的时候从队首取。
  • 如果取到的对象自己的计数器已经等于公共计数器的数值,说明整个 queue 里面的对象心跳都已经完成了。

当然,这种方法的弊端在于,判断是否还需要发送心跳这件事情,不仅需要从 queue 里取对象,还要判断对象的计数器数值,明显比两个 queue 的解决方案复杂和开销大。因为两个 queue 的解决方案下,只需要尝试从 main queue 里面取对象就好,取不到了就说明本次 cycle 里没有需要发送心跳的对象了。看起来是多了一个 queue,但是方案其实还是简单一些。

心跳的频率保持在多久为好?

显然不是越高越好,不只是成本,因为心跳也是需要消耗资源的,比如 CPU 资源;而且,心跳在 service 端也有 throttling,当前 activity worker 发起太频繁的心跳,当前心跳可能被拒,还可能会让别的 activity worker 的正常心跳被拒了。

我们要解决的最核心问题是,正常情况下,必须保持上限 5 分钟内能发起一次成功心跳就好。

要这么说,尽量增大 cycle,那我设计一个每隔 5 分钟就执行一次的定时器就好了。但是问题没那么简单,首先要考虑心跳的发起不一定成功。如果在接近 5 分钟的时候才去尝试发起心跳,一旦失败了,也没有时间重试了。因此,要 trade-off。比如,配置 cycle 为 120 秒,这样的好处是,5 分钟的超时时间内,可以覆盖 1~2 个完整的 cycle。如果 cycle 配置为 3 分钟,那么 5 分钟无法严格保证一定覆盖有一个完整的 cycle。

确定心跳频率的有两个重要参数,一个是方法 A 的执行频率,一个则是一个 cycle 的时间长度。例如,前者为 1 per second,后者为 2 分钟,那么在理想情况下,一个 cycle 120 秒,可以处理 120 个 activity task,换言之,极限是 120 个 activity task 在这台机器上一起执行。超过了这个数,就意味着在一个 cycle 内,无法完成所有的心跳发送任务。

当然,实际情况没有那么理想,考虑到暂时性的网络问题,线程、CPU 资源的竞争等等,实际可以并行的 activity task 要比这个数低不少。

异常处理和重试

在上图中,步骤③有三个箭头,表示了心跳出现不同种情形的处理:

  • 有一些常规异常,比如表示资源不存在,或者任务已经 cancel 了,这种情况发生的时候,要把相应的 activity task 给 cancel 掉,同时,把自己这个 heartbeatable 对象永久移除出 queue。
  • 重试情形 1:throttling 导致的异常,这种异常发生的时候,把当前 heartbeatable 对象再 addFirst 回 main queue,因为这不是当前有什么不可解决的或者不明原因的问题造成的,只需要简单重试即可。
  • 重试情形 2:其它未知原因的异常,这种情况当然需要重试(之前我们缺少这样的重试机制,导致下一次该 activity task 能够得到心跳的机会被推到了下一个 cycle,这显然是不够合理的),但是,可以把 heartbeatable 对象放到 queue 尾部去重试(addLast),并且附上一个私有计数器,如果重试超过一定次数,就挪到下一个 cycle(backup queue)去。这个放到 queue 尾部的办法,使得重试可以在当前 cycle 里进行,又可以使得这个重试能够尽量不影响其他 heartbeatable 对象的心跳及时发送。整个重试过程其实就是把当前失败对象再放回 queue 的过程,没有线程阻塞。

曾经遇到过一些这方面的问题,经过改进才有了上述的机制:

在 CPU 或者 load 达到一定程度的时候(比如这个时候有一个进程在 call service,占用了大量的 CPU 资源),就很容易发生心跳无法及时进行的问题,比如有时候线程已经初始化了,但是会 stuck 若干时间,因为没有足够的资源去进行。等到某一时刻,资源被释放(比如这个 call service 的进程结束),这个时候之前积攒的心跳任务会一下子爆发出来。不但这些心跳的顺序无法保证,而且严重的情况下会导致 throttling。如果没有当前 cycle 内的重试机制,那么下一次该对象的心跳需要等到下一个 cycle,很容易造成 activity task 的 timeout。

下面再说一个和心跳异常有关的问题。

有这样一个例子,在这个工作流框架内,我们需要管理 EMR 资源,有一个 activity 把 EMR cluster 初始化完成,另一个 activity 把实际执行的 steps 提交上去。但是发现在实际运行时有如下的问题:EMR cluster 已经初始化完成,但是 steps 迟迟没有办法提交上去,导致了这个 cluster 空闲太长时间,被框架内的 monitor 认为已经没有人使用了,需要回收,于是 EMR cluster 就被 terminate 了。但是这之后,steps 才被提交上去,但是这时候 cluster 已经处于 terminating 状态了,自然这个 step 提交就失败了。而经过分析,造成这个 EMR cluster 非预期的 termination,包括这样几个原因:

  • decision task timeout。在 EMR cluster 创建好之后,SWF 会问 decider 下一步该干嘛,这时候如果因为 CPU 高负荷等各种原因,导致 decision task timeout,SWF 就会一直等在那里,而如果这个 timeout 的时间配得太长,这段超时就足以让上面的这个 EMR cluster 空闲过长时间导致被误回收了。
  • 判断 EMR cluster 空闲到一定时间就要回收的逻辑有问题。我们以前的实现是,每隔 2 分钟执行一次 “EMR 资源操作”,包括检查资源状态,进行资源操作,然后如果发现该 EMR 资源创建后经过了 4 次资源操作,依然没有 step 提交上去,就认为空闲时间过长,需要回收(2 x 4 = 8 分钟)。但是问题在于,实际由于种种原因(和心跳的执行间隔实际时间不确定的原理一样),间隔执行 EMR 资源操作并不能严格保证每隔 2 分钟一次,有时一段时间都得不到执行,而有时候会迎来一次集中爆发,这个时候就可能实际 EMR 资源空闲了远远不到 8 分钟就被回收了。因此,这个逻辑最好是能够用绝对的 “空闲时间” 来判断,例如 EMR 资源创建时记录好时间,之后每次检查时都用当前时间去和创建时间比较,空闲超过 8 分钟再回收。
  • 由于之前提到过的心跳无法按时完成导致 activity task timeout,于是这个 EMR cluster 创建的任务实际已经完成了,但是被当做超时给无视了。

最后,我想说的是。设计一个好的工作流框架,还是有很多困难的地方,需要尤其考虑周全的地方。即便是基于 SWF 这样现有的 workflow 来搭积木和叠加功能,也有很多不易和有趣的地方。

文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》

emr

0 人点赞