本文针对IO密集型任务流的实现设计,提供一种基于状态转移图的优化思路。
0x00 问题背景
IO密集型任务流的框架在后台业务中具有重要广泛的应用场景,我们应该不断得追求其设计的优雅以及良好的扩展性。
考虑一种常见任务的简化版本:假设一个任务Task,由3个子步骤串行组合完成。步骤为s1、s3、s5,并且s1和s3的清理回滚步骤分别为s2、s4。
如何实现一个工作流框架,使之完成任务步骤流的声明定义以及执行引擎的实现呢?
0x01 简单直观实现
根据问题的上述描述,不少人可能第一反应是模拟整个执行的流程,如正向流程和反向(回滚清理)流程。然后按顺序执行步骤。而在步骤定义上,也会直观得考虑定义正反两个步骤数组,或是二元组的数组,如以下实现:
代码语言:python代码运行次数:0复制STEPS = [
[s1, s2],
[s3, s4],
[s5, None],
]
def simple_workflow(steps):
def run(f): return f()
index, clean = 0, False
while index >= 0 and index < len(steps):
if not clean: # 正向执行步骤
try:
run(steps[index][0])
index = index 1
except Exception as e:
print('not perfect but ok')
clean = True
index = index - 1
else: # 回滚清理步骤
try:
run(steps[index][1])
except:
print('woo!')
finally:
index = index - 1
if index == len(steps):
print('Success')
else:
print('Failure with index:', index)
simple_workflow(STEPS)
'''
s1至s5为简单的print函数,随机抛异常。以下为s5抛异常的结果:
s1
s3
s5
not perfect but ok
s4
woo!
s2
Failure with index: -1
'''
如以上的代码所示,像类似以上这种简单的定义与实现,确实可以解决一定场景下的问题,而且便于理解。
关于这种思考的来源?我想或许源自Linux驱动程序里的清理goto语句,类似以下代码相信实现过Linux驱动的同学们应该并不陌生。
代码语言:c复制void do_something() {
ssize_t ret = do_work_1();
if (ret < 0) goto CLEAN_WORK_1;
ret = do_work_2();
if (ret < 0) goto CLEAN_WORK_2;
return ret;
CLEAN_WORK_2:
clean_up_work_2();
CLEAN_WORK_1:
clean_up_work_1();
}
又或者受到编程语言本身的异常捕获语法(如try/catch)影响:如try块做事,捕获错误,然后根据错误类型选择执行不同的清理逻辑。
问题在于,这种相对直接线性的实现方式,其可扩展性往往并不能满足相对复杂的需求。
每个过程只有成功失败两个状态,只能向前或向后,无法又其他的处理方式,比如:
步骤重试的实现?
步骤终止?比如重试步骤出错直接终止等逻辑。
如何按失败的分类进行不同的处理?如运行时异常、业务错误等。
以上的问题的解决都无可避免地需要在框架和业务的边界地带添加耦合逻辑。
0x02 更务实的方案
我们注意到经常定义为workflow?那么什么是真正的flow?
- 真正的flow是不会逆流的,永远向前的,没有rollback;
- 真正的flow不是一条线,必有若干支流,所谓主流也不过是最大(大概率流经)的一支;
- 真正的flow中的路径上的每一点,都是独一无二的,有自己的context。
因此,为了更加客观务实的模拟任务流,我们需要考虑用图(具体说是DAG)来定义流程,用状态机(具体说是FSM)来实现执行流程。
我们这里考虑提升思考维度,由线到面(从List到Graph)DAG定义流程。
最理想的情况是:没有任何“特例”,没有成功、失败,也没有回滚、重试与终止,更没有特殊处理。任何步骤同等对待,错误清理/回滚步骤同样是first-class citizen。一匡天下。
只要关注本质:即步骤执行,以及根据根据步骤执行结果而计算出的下一个步骤。
每一次的任务执行,就像一条鱼从河流源头向下游,最终游进湖/海等终点。
对于上文的示例问题,我们可以构造出如下的状态转移图,用节点表示每个步骤,用边表示状态的转移,每个步骤的执行结果及下一步方向(如绿色线表示返回值为0时的下一步方向,红色线表示返回值为1时的下一步方向)。
有了以上的思路,框架实现起来就举重若轻了。对于Python,可以方便地用临接表的方式实现DAG及其路径的动态搜索。
代码语言:python代码运行次数:0复制BETTER_STEPS = {
'start': s1,
'graph': {
s1: [s3, None],
s2: [None, None],
s3: [s5, s2],
s4: [s2, s2],
s5: [None, s4],
}
}
def better_workflow(steps):
def run(f):
try: return f()
except: return 1
graph, next_step = steps['graph'], steps['start']
path = []
while next_step:
path.append(next_step.__name__)
code = run(next_step)
next_step = graph[next_step][code]
print('Completed with path: ', path)
better_workflow(BETTER_STEPS)
'''
s1至s5为简单的print函数,随机抛异常。以下为s5抛异常的结果:
s1
s3
s5
s4
s2
Completed with path: ['s1', 's3', 's5', 's4', 's2']
'''
0x03 效果及扩展性
从代码实现上看,用DAG的实现比简单基于双数组的实现方法,即简单明确代码少,又同时具备强大得多的扩展性。
对于如下一张稍微复杂的图,表意也能完整清晰:
我们可以通过步骤函数返回不同且定制的状态码,进而实现不同的状态转移,如:
重试:s1等code为2时,下一步指向自己,即图中黄色线。
终止:s4、s6的code为1(一般失败)直接结束。
其他任意特殊状态:s7的code为3(其他状态)时,转至步骤s8。
下面是上图的定义代码,转移简单一目了然,未来扩展也非常方便,不是么?
代码语言:javascript复制BETTER_STEPS = {
'start': s1,
'graph': {
s1: [s3, None, s1],
s2: [None, None, s2],
s3: [s5, s2, s3],
s4: [s2, None, s4],
s5: [s7, s4, s5],
s6: [s4, None, s6],
s7: [None, s6, s7, s8],
s8: [None, None],
}
}
0x04 参考实现细节
以上是介绍基础的实现思路,以及如何定义任务中的步骤组合逻辑。阐述的细节也是侧重上层算法和设计层面的。然而,实现真实的IO-Bound的任务流组件是更加复杂的,考虑的重点(如性能、稳定性)也是要稍微多一些,是个相对细致的工作。尤其不可避免的,任务的执行层实现必须要考虑IO的高并发性能,通常要结合epoll和队列等相对底层的技术。对于这种任务定义方式,只要步骤划分合理,在分布式场景中也是可以完全应用的。
对于理想的IO任务流框架,应该具备至少以下特点:
- 动静分离:静态的步骤声明与动态的执行过程完全解耦,一切即在预料之中,又在掌控之内;
- 没有异常:没有异常回滚就没有所谓事务,步骤执行只是预定义的状态转移图的一条特定路径(path);
- 灵活定制:方便扩展任何步骤,以及对应的流程设计。
读者如果有兴趣,可以参考本人的示例实现:https://pypi.org/project/freactor/
可以通过pip install freactor安装,并参考源码的example目录下的方法体验。
0x05 一点思考
无论是完成任务还是处理解决问题,要避免过于耿直和朴素。
简单的线性思维以及非黑即白的二元决策往往是不成熟且适用范围窄的。
相比“成功”或“失败”,关注“下一步去哪”以及“干净地完成”显得更加重要。
当然,本文阐述的基于DAG的任务流框架虽然适合相当广泛的场景,但也绝非完美,也有它的不足和局限性。实现一个通用的高性能可扩展的任务流组件,还有更多的有趣新颖的思路和设计方法,敬请期待本人后续的分析讲解。
0x06 参考文献
http://news.qingdaonews.com/content/2016-03/03/content_11505164_all.htm
https://en.wikipedia.org/wiki/Directed_acyclic_graph
https://cloud.tencent.com/developer/article/1160842