Elsa V3学习之Flowchart详解(下)

2024-08-20 08:56:55 浏览数 (1)

接上文,我们介绍了Flowchart的部分逻辑,下来来讲解flowchart剩下的逻辑。

OnChildCompletedAsync

看下OnChildCompletedAsync的代码。

代码语言:javascript复制
    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
    {
        var logger = context.GetRequiredService<ILogger<Flowchart>>();
        var flowchartContext = context.TargetContext;
        var completedActivityContext = context.ChildContext;
        var completedActivity = completedActivityContext.Activity;
        var result = context.Result;

        // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
        var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
        var outcomeNames = result is Outcomes outcomes
            ? outcomes.Names
            : [null!, "Done"];

        // Only query the outbound connections if the completed activity wasn't already completed.
        var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
        var children = outboundConnections.Select(x => x.Target.Activity).ToList();
        var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());

        scope.RegisterActivityExecution(completedActivity);

        // If the complete activity is a terminal node, complete the flowchart immediately.
        if (completedActivity is ITerminalNode)
        {
            await flowchartContext.CompleteActivityAsync();
        }
        else if (scheduleChildren)
        {
            if (children.Any())
            {
                // Schedule each child, but only if all of its left inbound activities have already executed.
                foreach (var activity in children)
                {
                    var existingActivity = scope.ContainsActivity(activity);
                    scope.AddActivity(activity);

                    var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

                    // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                    if (!inboundActivities.Contains(completedActivity))
                    {
                        await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                        continue;
                    }

                    // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                    if (activity is not IJoinNode)
                    {
                        var executionCount = scope.GetExecutionCount(activity);
                        var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);

                        if (haveInboundActivitiesExecuted) 
                            await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                    }
                    else
                    {
                        // Select an existing activity execution context for this activity, if any.
                        var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
                            x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
                        var scheduleWorkOptions = new ScheduleWorkOptions
                        {
                            CompletionCallback = OnChildCompletedAsync,
                            ExistingActivityExecutionContext = joinContext,
                            PreventDuplicateScheduling = true
                        };

                        if (joinContext != null)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
                        else if (!existingActivity)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
                        else
                        {
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
                            continue;
                        }

                        await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
                    }
                }
            }

            if (!children.Any())
            {
                await CompleteIfNoPendingWorkAsync(flowchartContext);
            }
        }

        flowchartContext.SetProperty(ScopeProperty, scope);
    }

从第11行开始,这里先做了下判断,判断执行结束的节点状态是否是已完成。如果不是已完成,则不会再往下执行,节点执行结束并不表示节点执行完成,这个节点可能处于异常,或者暂停状态。

第17-18行的代码表示,获取已完成的节点的后续所有节点。如果这个节点的出口连接了多个节点,那么这里的children会有多个节点。

第19行和21行则是获取当前工作流的上下文的一个Scope状态。并将已执行过的节点做记录。

再接下来有个判断,判断执行完成的节点是否是ITerminalNode类型,如果是则直接完成整个工作流。继承ITerminalNode的节点都将是流程的终结节点。

然后才是判断scheduleChildren的结果,如果不是已完成的状态,则保存一下scope状态,不继续执行后续流程。 如果节点已完成,则继续判断children是否有值,如果没有,说明后续没有连线的节点,那么就会继续判断节点是否处于挂起状态或者异常状态,如果都没有,则结束工作流程。

代码语言:javascript复制
private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
{
    var hasPendingWork = HasPendingWork(context);

    if (!hasPendingWork)
    {
        var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);

        if (!hasFaultedActivities)
        {
            await context.CompleteActivityAsync();
        }
    }
}

如果children有值。那么将遍历children,并执行该Activity。 执行Children Activity时,首先在scope记录该节点。 然后判断已完成的活动在不在左侧入口的路线中,如果不在,则执行Activity。

代码语言:javascript复制
var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

// If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
if (!inboundActivities.Contains(completedActivity))
{
    await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
    continue;
}

如果在,那么再继续判断当前Activity是否属于IJoinNode类型,如果是IJoinNode类型,那么需要等待其左侧所有连接的节点执行结束后再继续执行。 如果不是,那么继续判断其左侧节点的执行次数是否大于当前节点,满足条件则继续执行。

这里可以发现,每一步的ScheduleActivityAsync都会继续把OnChildCompletedAsync传递下去,使用递归的方式执行我们的工作流,知道工作流程结束。

到这我们就基本理清楚我们的flowchart的执行逻辑了。

Signal

然后我们回过头看flowchart的构造函数。

代码语言:javascript复制
public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
{
    OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
    OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
    OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
}

可以看到有几个信号接收的订阅。这里的Signal用于在工作流执行的过程中接收到的外部信号,并对应作出处理,这里最简单的是CancelSignal,当flowchart的执行过程中,如果收到这个信号,那么将立即完成执行工作流。

代码语言:javascript复制
    private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
    {
        await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
    }

详细的Signal的执行逻辑,我们将在后续文章中继续介绍。flowchart介绍先到此结束

结语

通过两篇文章,我们基本理清楚了我们编排后的工作流的运行逻辑。希望对小伙伴们有所帮助。

0 人点赞