一、概论
根据前面一篇串行并行编排(https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA)的文章。我们知道无论串行还是并行编排,都需要基于chain来实现condition的调用。那么在并行编排condition的过程又是如何实现这个过程的呢?下面我们详细来了解并行编排从condition到node的过程,因为串行编排相对来说要简单一些,但是总体的思路是类似的,只不过执行的condition不一样。
二、并行编排执行器基于策略模式实现逻辑
找到并行编排的condition入口:
代码语言:javascript复制 private void executeAsyncCondition(Integer slotIndex) throws Exception {
// 获取并发执行策略
ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy());
// 执行并发逻辑
parallelStrategyExecutor.execute(this, slotIndex);
}
执行的过程首先获取对应的执行器,然后执行具体的执行器,然后执行业务逻辑。
从代码中,执行器分为三类:完成任一任务、完成全部任务、完成指定ID任务。
代码语言:javascript复制 ANY("anyOf", "完成任一任务", AnyOfParallelExecutor.class),
ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);
从代码可以看到作者使用了策略模式来实现获取对应的执行。获取的思路是首先基于当前的并行策略帮助类获取其实例对象,可以看到作者基于单例模式实现对实例对象的获取。然后从策略执行器strategyExecutorMap中获取,如果获取到了,说明之前有存储过这种执行器,此时直接返回。如果当前不存在,则根据枚举的类型获取具体的执行器,然后将其放入到策略执行器map中,同时进行返回。
可以看到作者还是非常的细心的,将共性方法提取到了抽象类ParallelStrategyExecutor中,调用execute(WhenCondition whenCondition, Integer slotIndex)来实现对执行器对应的逻辑处理:
代码语言:javascript复制AllOfParallelExecutor 全部执行并行编排执行器
AnyOfParallelExecutor 任一任务并行编排执行器
SpecifyParallelExecutor 完成指定任务执行器,使用 ID 进行比较
这里的Allof和Anyof与CompletableFuture执行的方式相对应。这里的枚举其实也是为后面CompletableFuture来执行并行编排逻辑处理做的铺垫。
三、AllOfParallelExecutor执行器执行逻辑
看到具体的执行器,是不是很疑问,哪里执行了我们的业务逻辑了呢?
代码语言:javascript复制 @Override
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
// 获取所有 CompletableFuture 任务,拿到结果
List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);
// 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务
CompletableFuture<?> specifyTask = CompletableFuture.allOf(whenAllTaskList.toArray(new CompletableFuture[] {}));
// 结果处理
this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, specifyTask);
}
而我们知道CompletableFuture处理任务无非两种结果,一种是有返回值的,一种是没有返回值的。可以看到这里是有返回值的,为什么呢?从返回值可以看到CompletableFuture。也即WhenFutureObj这个返回值是我们需要关注的对象。
这里可以看到很多事情是在this.getWhenAllTaskList(whenCondition, slotIndex);中完成的,也即这一步对应并行编排来说是非常重要的,因为后续的两个操作,只是做结果的合并和处理task的结果。因此我们把目光注意到WhenFutureObj,也即我们返回的结果是基于WhenFutureObj这个对象来组装的。
可以看到这个对象中保存的信息:
代码语言:javascript复制public class WhenFutureObj {
//判断执行任务是否成功
private boolean success;
// 是否超时
private boolean timeout;
// 执行器的名称
private String executorName;
// 出现的异常信息
private Exception ex;
}
因为我们知道CompletableFuture.supplyAsync这个Api是有返回值的,也即我们可以顺着WhenFutureObj这个对象找到对应的get对象。因为线程方法的入口都是run方法,因此我们debug,顺着这个CompletableFuture找到对应的run方法的f.get():
代码语言:javascript复制 public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
f.get()这个就有文章了,因为这个方法会调用我们的get方法:
这个方法可能不是很显眼,但是我们的确是可以找到它的。
因此我们可以看到其实它执行业务方法是在
这里执行的,因为这里执行的过程会返回对应的结果。而真正的实现则是在get中。
代码语言:javascript复制public WhenFutureObj get() {
try {
executableItem.setCurrChainId(currChainId);
executableItem.execute(slotIndex);
return WhenFutureObj.success(executableItem.getId());
}
catch (Exception e) {
return WhenFutureObj.fail(executableItem.getId(), e);
}
}
因为这里会调用业务具体的nodeComponent,执行具体的业务逻辑。而这里的返回结果就是我们上面看到的对象 WhenFutureObj。是不是很激动。对的,这就是我们想要找到执行任务是否成功的结果。
四、调用Node组件来执行具体的NodeComponent
调用Node来实现具体业务逻辑的调用,可以看到作者是非常的贴心的:
代码语言:javascript复制public void execute() throws Exception {
Slot slot = this.getSlot();
// 在元数据里加入step信息
CmpStep cmpStep = new CmpStep(nodeId, name, CmpStepTypeEnum.SINGLE);
cmpStep.setTag(this.getTag());
cmpStep.setInstance(this);
cmpStep.setRefNode(this.getRefNode());
slot.addStep(cmpStep);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
// 前置处理
self.beforeProcess();
// 主要的处理逻辑
self.process();
// 成功后回调方法
self.onSuccess();
// 步骤状态设为true
cmpStep.setSuccess(true);
}
catch (Exception e) {
// 步骤状态设为false,并加入异常
cmpStep.setSuccess(false);
cmpStep.setException(e);
// 执行失败后回调方法
// 这里要注意,失败方法本身抛出错误,只打出堆栈,往外抛出的还是主要的异常
try {
self.onError(e);
}
catch (Exception ex) {
String errMsg = StrUtil.format("component[{}] onError method happens exception", this.getDisplayName());
LOG.error(errMsg, ex);
}
throw e;
}
finally {
// 后置处理
self.afterProcess();
stopWatch.stop();
final long timeSpent = stopWatch.getTotalTimeMillis();
LOG.info("component[{}] finished in {} milliseconds", this.getDisplayName(), timeSpent);
// 往CmpStep中放入时间消耗信息
cmpStep.setTimeSpent(timeSpent);
// 性能统计
if (ObjectUtil.isNotNull(monitorBus)) {
CompStatistics statistics = new CompStatistics(this.getClass().getSimpleName(), timeSpent);
monitorBus.addStatistics(statistics);
}
}
}
可以看到作者非常的贴心,进行了前置和后置方法的处理。而真正处理业务逻辑桥梁的self.process()这个方法。因为这个方法处理我们业务系统对应nodeComponent逻辑。
五、业务之外的扩展
从上面我们可以看到作者对nodeComponent处理的逻辑写了很多处理的方法。除了执行业务逻辑处理的桥梁的self.process()方法主要包括:
代码语言:javascript复制beforeProcess 前置处理
onSuccess 成功回调
onError 执行失败后回调方法
afterProcess 后置处理
monitorBus.addStatistics 性能统计
可以看到一个优秀的框架不仅需要处理正确的逻辑,还需要处理异常的逻辑,同时给业务留下口子,方便业务进行扩展。
除此之外,还有降级组件FallbackCmp,在业务逻辑中,可以基于此组件实现降级处理。
当然,除了whenCondition和ThenCondition之外,还有很多其它的Condition。更多的condition可以去官网了解。
六、相关参考地址
LiteFlow开源的gitee地址: https://gitee.com/dromara/liteFlow
LiteFlow开源的github地址: https://github.com/dromara/liteflow
LiteFlow开源的官网地址: https://liteflow.cc/
串行并行编排:https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA