Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存写端操作方式

2019-05-25 17:05:33 浏览数 (1)

上篇我们提到CQRS是一种读写分离式高并发、大流量数据录入体系,其中存写部分是通过event-sourcing akka-persistence实现的。也可以这样理解:event-sourcing(事件源)是一种特殊数据录入模式,akka-persistence是这种模式的具体实现方式。事件源的核心思想是把某写发生的事件写入log(journal)。这些事件是能成功改变系统状态,并且时已经发生的事情。一开始我常常把事件源和命令源(command-sourcing)混为一谈。它们根本的区别事件event是已经发生的,命令command是待发生的。如果我们把命令存入journal,在对journal里的记录进行重新演算(replay)时就会执行命令并产生一些副作,如显示打印、发email等等。而系统状态和环境随着时间在不断变化,这些副作用也会在不同的时间产生不同的影响,这肯定是我们不想看见的。

事件源模式中,在内存里保存能代表程序状态的对象state-objects,这些状态对象与数据库表model之间建立了对应关系。假设程序中支持某些指令command,它们会改变程序的状态并且还可能还会产生一些副作用,那么用事件源做法的操作顺序应该是:产生副作用->存写事件->改变内存里的状态对象。其中任何一个环节失败都会放弃下面的环节。另一方面,在用journal中记录进行重新演算时,就需要先把发生的事件还原成改变状态的命令,人为的免去副作用,因为它已经在正确的时间产生过了,然后只要更新数据库model状态就算完成了。所以,实现persistence包括object和model之间对应、state-objects维护方式以及command和event之间的转换。

首先分析一下command与event之间的转换:我们还是用上一篇的POS收银系统做示范。下面是几个收银操作指令:

代码语言:javascript复制
  case class GetItemInfo(itemcode: String) extends Command   
  case class AddItem(item: Item, qty: Int) extends Command   
  case class AliPay(amount: Double) extends Command

上面三个典型command可以下面的方式转换成event:

GetItemInfo:这是一个查询商品资料的指令,不影响交易状态,不必转换

AddItem: 这个指令只影响交易状态,没有副作用,转换成 :ItemAdded(item: Item, qty: Int) extends Event

AliPay:改变交易状态并且产生副作用,因为要即时从支付宝扣款。做法:先确定支付成功,然后转成: AliPaid(amount Double) extends Event

代码语言:javascript复制
  case class ItemAdded(item: Item, qty: Int) extends Event
  case class AliPaid(amount: Double) extends Event

POS收银交易状态是一张未结算账单内容,是个简单的交易记录清单SalesMemo:

代码语言:javascript复制
  //交易记录
  case class TxnItem(
     num: Int  //销售单号
    ,seq: Int  //交易序号
    ,txntype: Int //交易类型编号
    ,code: String //编号(商品、账号...)
    ,qty: Int //交易数量
    ,price: Int //单价(分)
    ,amount: Int //金额(分)
    )
  case class SalesMemo(salesnum: Int, txnitems: List[TxnItem] = Nil) {
    def itemAdded(evt: Event): SalesMemo = evt match {
      case ItemAdded(item,qty) =>
        copy(txnitems = TxnItem(salesnum, txnitems.length 1,0,item.code,qty,item.price,qty * item.price) :: txnitems)
      case _ => this
    }

    def aliPaid(evt: Event) = evt match {
      case AliPaid(amt) =>
        copy(txnitems = TxnItem(salesnum,txnitems.length 1,0,'ali',1,amt,amt) :: items)
      case _ => this
    }
  }

itemAdded,aliPaid这两个函数分别代表AddItem和AliPay对状态对象的转变处理。

上面提到persistenceActor存写journal时对事件发生的顺序有严格要求,否则无法实现读取端正确恢复原始状态。这项要求的实现是通过persist/persistAsync这两种函数来实现的。下面是这几类函数的款式:

代码语言:javascript复制
//无干扰存写,后面进来的消息先存放在内部的临时存放点 message-stashing
  def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
    internalPersist(event)(handler)
  }

//同时存写多个事件
  def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
    internalPersistAll(events)(handler)
  }

//异步存写事件,没有临时存放点机制  no-message-stashing
  def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
    internalPersistAsync(event)(handler)
  }

//异步存写多项事件
  def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
    internalPersistAllAsync(events)(handler)
  }

//不存写事件,利用内部临时存放点机制来保证handler执行顺序
  def defer[A](event: A)(handler: A ⇒ Unit): Unit = {
    internalDefer(event)(handler)
  }

//不存写事件,只保证handler运行顺序
  def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
    internalDeferAsync(event)(handler)
  }

无论如何,handler函数都保证在事件存写动作成功后才能运行。我们用一些伪代码来示范有临存stash及无临存no-stash时handler运行的顺序:

代码语言:javascript复制
  override def receiveCommand: Receive = {
    case c: String ⇒ {
      sender() ! c
      persist(s"evt-$c-1") { e ⇒ sender() ! e }
      persist(s"evt-$c-2") { e ⇒ sender() ! e }
      defer(s"evt-$c-3") { e ⇒ sender() ! e }
    }
  }

//有内部临存 with message stashing
persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// evt-a-1
// evt-a-2
// evt-a-3
// b
// evt-b-1
// evt-b-2
// evt-b-3

----------------------------------
  override def receiveCommand: Receive = {
    case c: String ⇒ {
      sender() ! c
      persistAsync(s"evt-$c-1") { e ⇒ sender() ! e }
      persistAsync(s"evt-$c-2") { e ⇒ sender() ! e }
      deferAsync(s"evt-$c-3") { e ⇒ sender() ! e }
    }
  }

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b           //无临存机制,外部信息立即处理了
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3

如果发生内嵌多层persist时,正确的顺序如下:

代码语言:javascript复制
override def receiveCommand: Receive = {
  case c: String ⇒
    sender() ! c

    persist(s"$c-1-outer") { outer1 ⇒
      sender() ! outer1
      persist(s"$c-1-inner") { inner1 ⇒
        sender() ! inner1
      }
    }

    persist(s"$c-2-outer") { outer2 ⇒
      sender() ! outer2
      persist(s"$c-2-inner") { inner2 ⇒
        sender() ! inner2
      }
    }
}

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// a-outer-1
// a-outer-2
// a-inner-1
// a-inner-2
// and only then process "b"
// b
// b-outer-1
// b-outer-2
// b-inner-1
// b-inner-2

--------------------------------
override def receiveCommand: Receive = {
  case c: String ⇒
    sender() ! c
    persistAsync(c   "-outer-1") { outer ⇒
      sender() ! outer
      persistAsync(c   "-inner-1") { inner ⇒ sender() ! inner }
    }
    persistAsync(c   "-outer-2") { outer ⇒
      sender() ! outer
      persistAsync(c   "-inner-2") { inner ⇒ sender() ! inner }
    }
}

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b
// a-outer-1
// a-outer-2
// b-outer-1
// b-outer-2
// a-inner-1
// a-inner-2
// b-inner-1
// b-inner-2

// which can be seen as the following causal relationship:
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2

值得注意的是这个handler函数只会在事件存写成功后才运行,失败则否。也就是说确认了事件已经安全存写后才更新state-objects状态(model状态在CQRS读取时再相应更新)。针对上面的POS例子里可以用下面的代码处理方式:

代码语言:javascript复制
  override def receiveCommand: Receive = {
    case AddItem(item,qty) =>
       persist(ItemAdded(item,qty))(salesMemo.itemAdded)
    case AliPay(amt) =>
      try {
        if (aliOnlinePay(amt))  //先产生副作用
          persist(AliPaid(amt))(salesMemo.alipaid(_))
      } catch {
        case _ > Throw new OnlinePayExecption("boom!!!")
      }
...

akka-persistence代表CQRS模式中以事件源方式存写数据的具体实现。我们提到过,数据存写具体做法是向一个journal里写入发生的改变状态目标state-objects的事件。每次PersistenceActor启动时都会从journal里读取之前写入的事件、还原成指令command、然后逐步把state-objects恢复到上次停止时的状态,不管是因异常还是正常停止的。这个恢复状态的过程是由PersistenceActor的receiveRecovery函数实现的,如下:

代码语言:javascript复制
  override def receiveRecover: Receive = {
    case evt: Event => 
           salesMemo = salesMemo.updateMemo(evt)
    case SnapshotOffer(_,loggedItems: SalesMemo) =>
           salesMemo = loggedItems
  }

按理来说恢复状态即是把事件从头到尾再演算一遍。不过这种方式效率是个大问题,试想每次启动都需要先读取几十万条数据会是怎样的感受。效率问题的解决方法就是通过存写快照方式把之前的事件总结成快照snapshot形式的阶段状态,然后存入快照库(snapshot-store)。这样在PersistenceActor启动时先用最后一个快照把状态恢复到一个阶段,然后再读取快照产生之后的所有事件对阶段性状态再转换成最新状态。快照的读写函数如下:

代码语言:javascript复制
def saveSnapshot(snapshot: Any): Unit = {
    snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  }

/**
 * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
 * before any further replayed messages.
 */
@SerialVersionUID(1L)
final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 */
@SerialVersionUID(1L) //#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata

PersistenceActor里有receiveRecover和receiveCommand两个抽象函数,必须由用户提供具体的实现。这两个函数代表了PersistentActor的两大功能:状态复原和消息处理。状态复原是通过receiveRecover对snapshot-store和journal里的记录处理实现的。而PersistentActor的receiveCommand就是普通Actor的receive消息处理函数。用户可以通过PersistentActor提供的回调(callback)函数来进行事件读取过程前的事前准备和后面的事后处理。可以对这些callback函数进行重载(override)来自定义这些处理程序,如:

代码语言:javascript复制
/**
   * Called whenever a message replay fails. By default it logs the error.
   *
   * Subclass may override to customize logging.
   *
   * The actor is always stopped after this method has been invoked.
   *
   * @param cause failure cause.
   * @param event the event that was processed in `receiveRecover`, if the exception
   *   was thrown there
   */
  protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
...
 /**
   * Called when persist fails. By default it logs the error.
   * Subclass may override to customize logging and for example send negative
   * acknowledgment to sender.
   *
   * The actor is always stopped after this method has been invoked.
   *
   * Note that the event may or may not have been saved, depending on the type of
   * failure.
   *
   * @param cause failure cause.
   * @param event the event that was to be persisted
   */
  protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
...
  /**
   * Called when the journal rejected `persist` of an event. The event was not
   * stored. By default this method logs the problem as a warning, and the actor continues.
   * The callback handler that was passed to the `persist` method will not be invoked.
   *
   * @param cause failure cause
   * @param event the event that was to be persisted
   */
  protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
...

也可以通过函数重载来自定义状态恢复行为:

代码语言:javascript复制
trait PersistenceRecovery {
  //#persistence-recovery
  /**
   * Called when the persistent actor is started for the first time.
   * The returned [[Recovery]] object defines how the Actor will recover its persistent state before
   * handling the first incoming message.
   *
   * To skip recovery completely return `Recovery.none`.
   */
  def recovery: Recovery = Recovery()
  //#persistence-recovery
}

整个状态恢复过程是在EventSourced.scala里下面这个函数实现的:

代码语言:javascript复制
override def stateReceive(receive: Receive, message: Any) = try message match {
        case ReplayedMessage(p) ⇒
          try {
            eventSeenInInterval = true
            updateLastSequenceNr(p)
            Eventsourced.super.aroundReceive(recoveryBehavior, p)
          } catch {
            case NonFatal(t) ⇒
              timeoutCancellable.cancel()
              try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
              returnRecoveryPermit()
          }
        case RecoverySuccess(highestSeqNr) ⇒
          timeoutCancellable.cancel()
          onReplaySuccess() // callback for subclass implementation
          sequenceNr = highestSeqNr
          setLastSequenceNr(highestSeqNr)
          _recoveryRunning = false
          try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
          finally transitToProcessingState()
        case ReplayMessagesFailure(cause) ⇒
          timeoutCancellable.cancel()
          try onRecoveryFailure(cause, event = None) finally context.stop(self)
        case RecoveryTick(false) if !eventSeenInInterval ⇒
          timeoutCancellable.cancel()
          try onRecoveryFailure(
            new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"),
            event = None)
          finally context.stop(self)
        case RecoveryTick(false) ⇒
          eventSeenInInterval = false
        case RecoveryTick(true) ⇒
        // snapshot tick, ignore
        case other ⇒
          stashInternally(other)
      } catch {
        case NonFatal(e) ⇒
          returnRecoveryPermit()
          throw e
      }

函数通过super.aroundReceive把消息传给了receiveRecovery:

代码语言:javascript复制
/**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor's current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
    // optimization: avoid allocation of lambda
    if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
      unhandled(msg)
    }
  }

因为EventSourced继承了PersistenceRecovery trait,所以重载recovery函数可以改变状态恢复行为。默认的模式是:

代码语言:javascript复制
/**
 * Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
 *
 * By default recovers from latest snapshot replays through to the last available event (last sequenceId).
 *
 * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
 * and at least one of these snapshots matches the specified `fromSnapshot` criteria.
 * Otherwise, recovery will start from scratch by replaying all stored events.
 *
 * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
 * message, followed by replayed messages, if any, that are younger than the snapshot, up to the
 * specified upper sequence number bound (`toSequenceNr`).
 *
 * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
 *                     is latest (= youngest) snapshot.
 * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
 * @param replayMax maximum number of messages to replay. Default is no limit.
 */
@SerialVersionUID(1L)
final case class Recovery(
  fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  toSequenceNr: Long                      = Long.MaxValue,
  replayMax:    Long                      = Long.MaxValue)

下面是状态恢复过程中产生的消息:

代码语言:javascript复制
/**
 * Sent to a [[PersistentActor]] when the journal replay has been finished.
 */
@SerialVersionUID(1L)
case object RecoveryCompleted extends RecoveryCompleted {
...
final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace

状态恢复的进程可用用下面的方法检测:

代码语言:javascript复制
/**
   * Returns `true` if this persistent actor is currently recovering.
   */
  def recoveryRunning: Boolean = {
    // currentState is null if this is called from constructor
    if (currentState == null) true else currentState.recoveryRunning
  }

  /**
   * Returns `true` if this persistent actor has successfully finished recovery.
   */
  def recoveryFinished: Boolean = !recoveryRunning

用户也可以删除journal里的事件。虽然应该作为原始资料完整保存不应该鼓励这么做:

代码语言:javascript复制
/**
   * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
   *
   * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
   * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
   *
   * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
   */
  def deleteMessages(toSequenceNr: Long): Unit =
    journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)

删除事件范围是用SequenceNr来代表的,下面是一些可用的序号:

代码语言:javascript复制
/**
   * Returns `persistenceId`.
   */
  override def snapshotterId: String = persistenceId

  /**
   * Highest received sequence number so far or `0L` if this actor hasn't replayed
   * or stored any persistent events yet.
   */
  def lastSequenceNr: Long = _lastSequenceNr

  /**
   * Returns `lastSequenceNr`.
   */
  def snapshotSequenceNr: Long = lastSequenceNr

事件删除过程可用下面的消息监控:

代码语言:javascript复制
/**
 * Reply message to a successful [[Eventsourced#deleteMessages]] request.
 */
final case class DeleteMessagesSuccess(toSequenceNr: Long)

/**
 * Reply message to a failed [[Eventsourced#deleteMessages]] request.
 */
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)

下面是一些快照的持久化维护方法:

代码语言:javascript复制
/**
   * Snapshotter id.
   */
  def snapshotterId: String

  /**
   * Sequence number to use when taking a snapshot.
   */
  def snapshotSequenceNr: Long

  /**
   * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
   * to the running [[PersistentActor]].
   */
  def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
    snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)

  /**
   * Saves a `snapshot` of this snapshotter's state.
   *
   * The [[PersistentActor]] will be notified about the success or failure of this
   * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
   */
  def saveSnapshot(snapshot: Any): Unit = {
    snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  }

  /**
   * Deletes the snapshot identified by `sequenceNr`.
   *
   * The [[PersistentActor]] will be notified about the status of the deletion
   * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
   */
  def deleteSnapshot(sequenceNr: Long): Unit = {
    snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
  }

  /**
   * Deletes all snapshots matching `criteria`.
   *
   * The [[PersistentActor]] will be notified about the status of the deletion
   * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
   */
  def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
    snapshotStore ! DeleteSnapshots(snapshotterId, criteria)
  }

快照维护数据结构和消息如下:

代码语言:javascript复制
/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 */
@SerialVersionUID(1L) //#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata

object SnapshotMetadata {
  implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) ⇒
    if (a eq b) false
    else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
    else if (a.sequenceNr != b.sequenceNr) a.sequenceNr < b.sequenceNr
    else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
    else false
  }
}

/**
 * Sent to a [[PersistentActor]] after successful saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
@SerialVersionUID(1L)
final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after successful deletion of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
 *
 * @param criteria snapshot selection criteria.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed deletion of a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
 *
 * @param criteria snapshot selection criteria.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
 * before any further replayed messages.
 */
@SerialVersionUID(1L)
final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

/**
 * Selection criteria for loading and deleting snapshots.
 *
 * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound,
 *   i.e. `Long.MaxValue`
 * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound,
 *   i.e. `Long.MaxValue`
 * @param minSequenceNr lower bound for a selected snapshot's sequence number. Default is no lower bound,
 *   i.e. `0L`
 * @param minTimestamp lower bound for a selected snapshot's timestamp. Default is no lower bound,
 *   i.e. `0L`
 *
 * @see [[Recovery]]
 */
@SerialVersionUID(1L)
final case class SnapshotSelectionCriteria(
  maxSequenceNr: Long = Long.MaxValue,
  maxTimestamp:  Long = Long.MaxValue,
  minSequenceNr: Long = 0L,
  minTimestamp:  Long = 0L) {

  /**
   * INTERNAL API.
   */
  private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
    if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this

  /**
   * INTERNAL API.
   */
  private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
    metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp &&
      metadata.sequenceNr >= minSequenceNr && metadata.timestamp >= minTimestamp
}

object SnapshotSelectionCriteria {
  /**
   * The latest saved snapshot.
   */
  val Latest = SnapshotSelectionCriteria()

  /**
   * No saved snapshot matches.
   */
  val None = SnapshotSelectionCriteria(0L, 0L)

  /**
   * Java API.
   */
  def create(maxSequenceNr: Long, maxTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)

  /**
   * Java API.
   */
  def create(maxSequenceNr: Long, maxTimestamp: Long,
             minSequenceNr: Long, minTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)

  /**
   * Java API.
   */
  def latest() = Latest

  /**
   * Java API.
   */
  def none() = None
}

/**
 * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot.
 */
final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)

object SelectedSnapshot {
  /**
   * Java API, Plugin API.
   */
  def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
    SelectedSnapshot(metadata, snapshot)
}

/**
 * INTERNAL API.
 *
 * Defines messages exchanged between persistent actors and a snapshot store.
 */
private[persistence] object SnapshotProtocol {

  /** Marker trait shared by internal snapshot messages. */
  sealed trait Message extends Protocol.Message
  /** Internal snapshot command. */
  sealed trait Request extends Message
  /** Internal snapshot acknowledgement. */
  sealed trait Response extends Message

  /**
   * Instructs a snapshot store to load a snapshot.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting a snapshot from which recovery should start.
   * @param toSequenceNr upper sequence number bound (inclusive) for recovery.
   */
  final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
    extends Request

  /**
   * Response message to a [[LoadSnapshot]] message.
   *
   * @param snapshot loaded snapshot, if any.
   */
  final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
    extends Response

  /**
   * Reply message to a failed [[LoadSnapshot]] request.
   * @param cause failure cause.
   */
  final case class LoadSnapshotFailed(cause: Throwable) extends Response

  /**
   * Instructs snapshot store to save a snapshot.
   *
   * @param metadata snapshot metadata.
   * @param snapshot snapshot.
   */
  final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
    extends Request

  /**
   * Instructs snapshot store to delete a snapshot.
   *
   * @param metadata snapshot metadata.
   */
  final case class DeleteSnapshot(metadata: SnapshotMetadata)
    extends Request

  /**
   * Instructs snapshot store to delete all snapshots that match `criteria`.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting snapshots to be deleted.
   */
  final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
    extends Request
}

篇幅所限,我们将在下一篇用一个具体的应用例子来进行akka-CQRS写端示范。

0 人点赞