使用c#的 async/await编写 长时间运行的基于代码的工作流的 持久任务框架

2022-12-02 17:11:31 浏览数 (1)

持久任务框架 (DTF) 是基于async/await 工作流执行框架。工作流的解决方案很多,包括Windows Workflow Foundation,BizTalk,Logic Apps, Workflow-Core 和 Elsa-Core。最近我在Dapr 的仓库里跟踪工作流构建块的进展时,深入了解了一下,这个DTFx在Azure 基础设施有大量的应用,现在Dapr团队正在把这个实践抽象成工作流构建块,具体参看https://github.com/dapr/dapr/issues/4576。DTFx 正好是.NET开发的,所以对他多了几分关注,以前没有深入进去看看,现在我觉得是值得推荐给大家的一个工作流方案,它足够轻量级,而且非常简单,依赖很少。

持久任务框架是一个开源框架,它为 .NET 平台中的工作流即代码提供了基础。GitHub上:https://github.com/Azure/durabletask

它有两个主要组件:业务流程和任务。业务流程“编排”应用程序逻辑,以内联方式执行自定义代码并调用任务。自定义业务流程派生自 TaskOrchestration<TResult, TInput>自定义任务派生自 TaskActivity<TInput, TResult>。

推荐大家从这两个仓库可用来学习和生产使用。

Microsoft.Extensions.Hosting包装器: https://github.com/jviau/durabletask-hosting

持久任务框架扩展: https://github.com/lucaslorentz/durabletask-extensions

我们一起来看下持久任务框架的Hello world: 代码来自https://github.com/jviau/durabletask-hosting 的 DurableTask.Samples:

这个非常简单的业务流程“GreetingsOrchestration”,有两个称为任务“GetUserTask”,它执行名称提示和“SendGreetingTask”,它将问候语写入控制台。

GreetingsOrchestration 派生自 TaskOrchestration<string、string> 并具有调用 GetUserTask 和 SendGreetingTask 的 RunTask 方法。

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary> /// A task orchestration for greeting a user. /// </summary> public class GreetingsOrchestration : TaskOrchestration<string, string> {      /// <inheritdoc />      public override async Task<string> RunTask(OrchestrationContext context, string input)      {          string user = await context.ScheduleTask<string>(typeof(GetUserTask));          string greeting = await context.ScheduleTask<string>(typeof(SendGreetingTask), user);          return greeting;      } }

GetUserTask 派生自 TaskActivity<string,string> 并实现了 Execute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary> /// A task activity for getting a username from console. /// </summary> public class GetUserTask : TaskActivity<string, string> {      private readonly IConsole _console;

    /// <summary>      /// Initializes a new instance of the <see cref="GetUserTask"/> class.      /// </summary>      /// <param name="console">The console output helper.</param>      public GetUserTask(IConsole console)      {          _console = console ?? throw new ArgumentNullException(nameof(console));      }

    /// <inheritdoc />      protected override string Execute(TaskContext context, string input)      {          _console.WriteLine("Please enter your name:");          return _console.ReadLine();      } }

SendGreetingTask 派生自 TaskActivity<string、string> 并实现了 Excute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary> /// A task for sending a greeting. /// </summary> public sealed class SendGreetingTask : AsyncTaskActivity<string, string> {      private readonly IConsole _console;

    /// <summary>      /// Initializes a new instance of the <see cref="SendGreetingTask"/> class.      /// </summary>      /// <param name="console">The console output helper.</param>      public SendGreetingTask(IConsole console)      {          _console = console ?? throw new ArgumentNullException(nameof(console));      }

    /// <inheritdoc />      protected override async Task<string> ExecuteAsync(TaskContext context, string user)      {          string message;          if (!string.IsNullOrWhiteSpace(user) && user.Equals("TimedOut"))          {              message = "GetUser Timed out!!!";              _console.WriteLine(message);          }          else          {              _console.WriteLine("Sending greetings to user: " user "...");              await Task.Delay(5 * 1000);              message = "Greeting sent to " user;              _console.WriteLine(message);          }

        return message;      } }

上面的这个例子非常基础,我们在项目中要把它用起来就要用到这个扩展项目 https://github.com/lucaslorentz/durabletask-extensions。这个项目通过更多功能扩展持久任务框架,并使其更易于使用,目前还在开发过程中,尚未达到投入生产的程度。包含了下列这些功能,让你在任何地方都可以运行。

  • 更多定义存储功能的接口
  • 依赖注入集成
  • EF Core MySql/PostgreSQL/SqlServer storages
  • 分布式工作线程:允许在多个工作线程中拆分业务流程/活动实现
  • 通过 GRPC 协议进行间接存储访问:将您的存储选择和配置集中在单个组件中。
  • 用户界面
  • BPMN 运行器

在示例文件夹中,您可以找到经典书籍《飞行、汽车、酒店》的实现,其中包含补偿问题。

该示例旨在演示具有以下组件的微服务体系结构:

  • 服务器:连接到存储并将其公开为 GRPC 终结点。
  • 应用程序接口:公开 REST API 以管理业务流程。
  • 用户界面:公开用于管理业务流程的 UI。
  • 业务流程工作线程:为给定问题实现BookParallel和BookSquential业务流程。
  • 飞行工作人员:实施预订航班和取消航班活动。
  • 车夫:实施“预订汽车”和“取消汽车”活动。
  • 酒店工作人员:实施预订酒店和取消酒店活动。
  • BPMNWorker:一个建立在持久任务之上的实验性 BPMN 运行器。对于给定的问题,还有BookParallel和BookSequentialBPMN 工作流。

0 人点赞