LiteFlow异步编排执行的具体过程逻辑

2023-10-25 09:39:01 浏览数 (1)

一、概论

根据前面一篇串行并行编排(https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA)的文章。我们知道无论串行还是并行编排,都需要基于chain来实现condition的调用。那么在并行编排condition的过程又是如何实现这个过程的呢?下面我们详细来了解并行编排从condition到node的过程,因为串行编排相对来说要简单一些,但是总体的思路是类似的,只不过执行的condition不一样。

二、并行编排执行器基于策略模式实现逻辑

找到并行编排的condition入口:

代码语言:javascript复制
    private void executeAsyncCondition(Integer slotIndex) throws Exception {

        // 获取并发执行策略
        ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy());

        // 执行并发逻辑
        parallelStrategyExecutor.execute(this, slotIndex);

    }

执行的过程首先获取对应的执行器,然后执行具体的执行器,然后执行业务逻辑。

从代码中,执行器分为三类:完成任一任务、完成全部任务、完成指定ID任务。

代码语言:javascript复制
    ANY("anyOf", "完成任一任务", AnyOfParallelExecutor.class),
    ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
    SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);

从代码可以看到作者使用了策略模式来实现获取对应的执行。获取的思路是首先基于当前的并行策略帮助类获取其实例对象,可以看到作者基于单例模式实现对实例对象的获取。然后从策略执行器strategyExecutorMap中获取,如果获取到了,说明之前有存储过这种执行器,此时直接返回。如果当前不存在,则根据枚举的类型获取具体的执行器,然后将其放入到策略执行器map中,同时进行返回。

可以看到作者还是非常的细心的,将共性方法提取到了抽象类ParallelStrategyExecutor中,调用execute(WhenCondition whenCondition, Integer slotIndex)来实现对执行器对应的逻辑处理:

代码语言:javascript复制
AllOfParallelExecutor 全部执行并行编排执行器
AnyOfParallelExecutor 任一任务并行编排执行器
SpecifyParallelExecutor 完成指定任务执行器,使用 ID 进行比较

这里的Allof和Anyof与CompletableFuture执行的方式相对应。这里的枚举其实也是为后面CompletableFuture来执行并行编排逻辑处理做的铺垫。

三、AllOfParallelExecutor执行器执行逻辑

看到具体的执行器,是不是很疑问,哪里执行了我们的业务逻辑了呢?

代码语言:javascript复制
 @Override
    public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {

        // 获取所有 CompletableFuture 任务,拿到结果
        List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);

        // 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务
        CompletableFuture<?> specifyTask = CompletableFuture.allOf(whenAllTaskList.toArray(new CompletableFuture[] {}));

        // 结果处理
        this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, specifyTask);

    } 

而我们知道CompletableFuture处理任务无非两种结果,一种是有返回值的,一种是没有返回值的。可以看到这里是有返回值的,为什么呢?从返回值可以看到CompletableFuture。也即WhenFutureObj这个返回值是我们需要关注的对象。

这里可以看到很多事情是在this.getWhenAllTaskList(whenCondition, slotIndex);中完成的,也即这一步对应并行编排来说是非常重要的,因为后续的两个操作,只是做结果的合并和处理task的结果。因此我们把目光注意到WhenFutureObj,也即我们返回的结果是基于WhenFutureObj这个对象来组装的。

可以看到这个对象中保存的信息:

代码语言:javascript复制
public class WhenFutureObj {

    //判断执行任务是否成功
    private boolean success;

    // 是否超时
    private boolean timeout;

    // 执行器的名称
    private String executorName;

    // 出现的异常信息
    private Exception ex;
}    

因为我们知道CompletableFuture.supplyAsync这个Api是有返回值的,也即我们可以顺着WhenFutureObj这个对象找到对应的get对象。因为线程方法的入口都是run方法,因此我们debug,顺着这个CompletableFuture找到对应的run方法的f.get():

代码语言:javascript复制
 public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

f.get()这个就有文章了,因为这个方法会调用我们的get方法:

这个方法可能不是很显眼,但是我们的确是可以找到它的。

因此我们可以看到其实它执行业务方法是在

这里执行的,因为这里执行的过程会返回对应的结果。而真正的实现则是在get中。

代码语言:javascript复制
public WhenFutureObj get() {
        try {
            executableItem.setCurrChainId(currChainId);
            executableItem.execute(slotIndex);
            return WhenFutureObj.success(executableItem.getId());
        }
        catch (Exception e) {
            return WhenFutureObj.fail(executableItem.getId(), e);
        }
    }

因为这里会调用业务具体的nodeComponent,执行具体的业务逻辑。而这里的返回结果就是我们上面看到的对象 WhenFutureObj。是不是很激动。对的,这就是我们想要找到执行任务是否成功的结果。

四、调用Node组件来执行具体的NodeComponent

调用Node来实现具体业务逻辑的调用,可以看到作者是非常的贴心的:

代码语言:javascript复制
public void execute() throws Exception {
        Slot slot = this.getSlot();

        // 在元数据里加入step信息
        CmpStep cmpStep = new CmpStep(nodeId, name, CmpStepTypeEnum.SINGLE);
        cmpStep.setTag(this.getTag());
        cmpStep.setInstance(this);
        cmpStep.setRefNode(this.getRefNode());
        slot.addStep(cmpStep);

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        try {
            // 前置处理
            self.beforeProcess();

            // 主要的处理逻辑
            self.process();

            // 成功后回调方法
            self.onSuccess();

            // 步骤状态设为true
            cmpStep.setSuccess(true);
        }
        catch (Exception e) {
            // 步骤状态设为false,并加入异常
            cmpStep.setSuccess(false);
            cmpStep.setException(e);

            // 执行失败后回调方法
            // 这里要注意,失败方法本身抛出错误,只打出堆栈,往外抛出的还是主要的异常
            try {
                self.onError(e);
            }
            catch (Exception ex) {
                String errMsg = StrUtil.format("component[{}] onError method happens exception", this.getDisplayName());
                LOG.error(errMsg, ex);
            }
            throw e;
        }
        finally {
            // 后置处理
            self.afterProcess();

            stopWatch.stop();
            final long timeSpent = stopWatch.getTotalTimeMillis();
            LOG.info("component[{}] finished in {} milliseconds", this.getDisplayName(), timeSpent);

            // 往CmpStep中放入时间消耗信息
            cmpStep.setTimeSpent(timeSpent);

            // 性能统计
            if (ObjectUtil.isNotNull(monitorBus)) {
                CompStatistics statistics = new CompStatistics(this.getClass().getSimpleName(), timeSpent);
                monitorBus.addStatistics(statistics);
            }
        }
    }

可以看到作者非常的贴心,进行了前置和后置方法的处理。而真正处理业务逻辑桥梁的self.process()这个方法。因为这个方法处理我们业务系统对应nodeComponent逻辑。

五、业务之外的扩展

从上面我们可以看到作者对nodeComponent处理的逻辑写了很多处理的方法。除了执行业务逻辑处理的桥梁的self.process()方法主要包括:

代码语言:javascript复制
beforeProcess 前置处理
onSuccess 成功回调
onError  执行失败后回调方法
afterProcess 后置处理
monitorBus.addStatistics 性能统计

可以看到一个优秀的框架不仅需要处理正确的逻辑,还需要处理异常的逻辑,同时给业务留下口子,方便业务进行扩展。

除此之外,还有降级组件FallbackCmp,在业务逻辑中,可以基于此组件实现降级处理。

当然,除了whenCondition和ThenCondition之外,还有很多其它的Condition。更多的condition可以去官网了解。

六、相关参考地址

LiteFlow开源的gitee地址: https://gitee.com/dromara/liteFlow

LiteFlow开源的github地址: https://github.com/dromara/liteflow

LiteFlow开源的官网地址: https://liteflow.cc/

串行并行编排:https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA

0 人点赞