聊聊MassTransit——Consumer Saga(译)

2023-10-19 19:47:04 浏览数 (2)

原文地址:Consumer Sagas

consumer saga是一个由CorrelationId标识的类,它定义了由saga repository持久化的状态。除了状态之外,还可以向saga类添加接口,定义由saga处理的事件。这种状态和行为在单个类中的组合就是一个consumer saga。在下面的示例中,定义了由SubmitOrder消息发起的order saga。

Interfaces

InitiatedBy
代码语言:javascript复制
public record SubmitOrder :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime OrderDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        SubmitDate = context.Message.OrderDate;
    }
}

当Saga的接收端点接收到SubmitOrder消息时,将使用CorrelationId属性来确定是否存在具有该CorrelationId的现有Saga实例。如果没有找到现有的实例,repository将创建一个新的saga实例,并在新实例上调用Consume方法。在Consume方法完成之后,repository保存新创建的实例。

Orchestrates

要定义由现有的saga实例(如OrderAccepted)编排的事件,需要指定一个额外的接口和方法。

代码语言:javascript复制
public record OrderAccepted :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}

    public async Task Consume(ConsumeContext<OrderAccepted> context)
    {
        AcceptDate = context.Message.Timestamp;
    }
}
InitiatedByOrOrchestrates

要定义一个可以启动一个新的或编排一个现有的saga实例(如OrderInvoiced)的事件,需要指定一个额外的接口和方法。

代码语言:javascript复制
public record OrderInvoiced :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
    public decimal Amount { get; init; }
}

public class OrderPaymentSaga :
    ISaga,
    InitiatedByOrOrchestrates<OrderInvoiced>
{
    public Guid CorrelationId { get; set; }

    public DateTime? InvoiceDate { get; set; }
    public decimal? Amount { get; set; }

    public async Task Consume(ConsumeContext<OrderInvoiced> context)
    {
        InvoiceDate = context.Message.Timestamp;
        Amount = context.Message.Amount;
    }
}
Observes

要定义由未实现CorrelatedBy接口(如OrderShipped)的现有saga实例观察到的事件,需要指定一个额外的接口和方法。

代码语言:javascript复制
public record OrderShipped
{
    public Guid OrderId { get; init; }
    public DateTime ShipDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
    Observes<OrderShipped, OrderSaga>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }
    public DateTime? ShipDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
    public async Task Consume(ConsumeContext<OrderAccepted> context) {...}

    public async Task Consume(ConsumeContext<OrderShipped> context)
    {
        ShipDate = context.Message.ShipDate;
    }

    public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
        (saga,message) => saga.CorrelationId == message.OrderId;
}

Configuration

要在配置MassTransit时添加一个saga,请使用如下所示的AddSaga方法

代码语言:javascript复制
services.AddMassTransit(x =>
{
    x.AddSaga<OrderSaga>()
        .InMemoryRepository();
});

0 人点赞