Akka-CQRS(4)- CQRS Writer Actor 示范

2019-05-25 17:51:29 浏览数 (1)

首先,我们可以有一个前置操作,里面包括了大部分对指令可执行性判断逻辑,这样我们可以把许多重复代码从Receive函数中移走,如下:

代码语言:javascript复制
  //helper functions
  object RunPOSCommand {
    def unapply(arg: POSCommand) = if (cmdFilter(persistenceId,arg,vchState,vchItems,sender())) Some(arg) else None
  }

  def cmdFilter(terminalid: String, cmd: POSCommand, state: VchStates, txns: VchItems, router: ActorRef): Boolean = cmd match {
    case LogOn(opr, passwd) =>      //only allowed in logOffState
      if (!txns.txnitems.isEmpty) { //in the middle of process
        router ! POSResponse(STATUS.FAIL, s"禁止用户登陆!终端 ${terminalid} 有未完成单据。", state, List())
        false
      } else{
        if (validateUser(opr, passwd).isDefined) true
        else {
          router ! POSResponse(STATUS.FAIL, s"终端-$terminalid: 用户 ${opr} 登陆失败!", state, List())
          false
        }
      }
    case LogOff =>       //only allowed in logOnState
      if (!txns.txnitems.isEmpty) { //in the middle of process
        router ! POSResponse(STATUS.FAIL, s"禁止用户退出!终端 ${terminalid} 有未完成单据。", state, List())
        false
      } else true

    case VoidAll =>   //allowed in logOnState and paymentState
      if (txns.txnitems.isEmpty) { //no valid sales
        router ! POSResponse(STATUS.FAIL, s"全单取消失败!终端 ${terminalid} 本单无任何有效销售记录。", state, txns.txnitems)
        false
      } else true

    case OfflinePay(acct,num,amt) =>
      if (txns.totalSales.abs == 0) { // no valid sales. void,refund neg values could produce zero
        router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 应付金额为零。", state, List())
        false
      } else {
        if(validateAcct(acct).isDefined) true
        else {
          router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 账号{$acct}不存在。", state, List())
          false
        }
      }
    case Subtotal =>
      if (txns.txnitems.isEmpty) { //in the middle of process
        router ! POSResponse(STATUS.FAIL, s"小计操作失败!终端 ${terminalid} 无任何销售记录。", state, List())
        false
      } else true
    case VCBalance(_,_,_) => true
    case MemberOn(_,_) => true
    case MemberOff => true
    case VoucherNum(_) => true
...
}

如上,前期的判断逻辑都移到cmdFilter(...)里了,然后可以用RunPOSCommand(command)方式来实现对command的执行性判断。现在我们可以在Receive函数里通过调用RunPOSCommand来节省一大截重复代码了,如下:

代码语言:javascript复制
 private def logOffState: Receive = {
    case RunPOSCommand(LogOn(opr, _)) =>
          persistEvent(LogOned(opr,vchState)) { evt =>
            val sts = updateState(evt, vchState, vchItems)
//starting seqenceNr for any voucher. no action logged before login
            vchState = sts._1.copy(jseq = lastSequenceNr   1);
            vchItems = sts._2
            sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户{$opr}成功登陆。", vchState, List(vchItems.txnitems.head))
            context.become(logOnState)
          }
    case PassivatePOS =>
      log.info(s"**********${persistenceId} got passivate message and stopping  ... ***********")
      context.parent ! PoisonPill

    case _ =>
      sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 用户未登陆。", vchState, List())
  }

另外,我把所有指令分成三个可执行类别,分别是:登陆前、已登陆、支付中。这三种状态的行为分别用logOffState,logOnState,paymentState三个Receive函数代表:

代码语言:javascript复制
  private def logOnState: Receive = {
    case RunPOSCommand(LogOff) =>
          persistEvent(LogOffed(vchState)) { evt =>
            val user = vchState.opr
            val sts = updateState(evt,vchState,vchItems)
            vchState = sts._1; vchItems = sts._2
            saveSnapshot(vchState)     //state of last voucher
          //手工passivate         shard ! ShardRegion.Passivate(PassivatePOS)
            sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
            context.unbecome()    //switch to logOffState
          }

    case RunPOSCommand(SuperOn(su,_)) =>
      persistEvent(SuperOned(su,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
        sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
      }
  private def logOnState: Receive = {
    case RunPOSCommand(LogOff) =>
          persistEvent(LogOffed(vchState)) { evt =>
            val user = vchState.opr
            val sts = updateState(evt,vchState,vchItems)
            vchState = sts._1; vchItems = sts._2
            saveSnapshot(vchState)     //state of last voucher
          //手工passivate         shard ! ShardRegion.Passivate(PassivatePOS)
            sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
            context.unbecome()    //switch to logOffState
          }

    case RunPOSCommand(SuperOn(su,_)) =>
      persistEvent(SuperOned(su,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
        sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
      }
  private def logOnState: Receive = {
    case RunPOSCommand(LogOff) =>
          persistEvent(LogOffed(vchState)) { evt =>
            val user = vchState.opr
            val sts = updateState(evt,vchState,vchItems)
            vchState = sts._1; vchItems = sts._2
            saveSnapshot(vchState)     //state of last voucher
          //手工passivate         shard ! ShardRegion.Passivate(PassivatePOS)
            sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
            context.unbecome()    //switch to logOffState
          }

    case RunPOSCommand(SuperOn(su,_)) =>
      persistEvent(SuperOned(su,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
        sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
      }
    //first payment in a voucher
    case RunPOSCommand(OfflinePay(acct,num, amount)) =>
      persistEvent(Payment(acct,num,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
        if (vchItems.totalSales > 0)
          sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
        else
          sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
        if (!vchState.due) {  //completed voucher. mark end of voucher and move next. stay in logOnState
          persistEvent(EndVoucher(vchState.num)) { evt =>
            val sts = updateState(evt, vchState, vchItems)
            vchState = sts._1
            vchItems = sts._2
            saveSnapshot(vchState)     //recovery to next voucher
          }
        }
        else context.become(paymentState)     //switch into paymentState
      }
...
}

 private def paymentState: Receive = {
    case RunPOSCommand(OfflinePay(acct,num, amount)) =>
           persistEvent(Payment(acct,num,vchState)) { evt =>
             val sts = updateState(evt, vchState, vchItems)
             vchState = sts._1
             vchItems = sts._2
             if (vchItems.totalSales > 0)
               sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
             else
               sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))

             if (!vchState.due) {  //completed voucher. mark end of voucher and move next. return to logOnState
               persistEvent(EndVoucher(vchState.num)) { evt =>
                 val sts = updateState(evt, vchState, vchItems)
                 vchState = sts._1
                 vchItems = sts._2
                 saveSnapshot(vchState)     //recovery to next voucher
                 context.unbecome()    //logOnState
               }
             }
             // else wait for other payments and stay in logOnState
           }
...
}

注意上面代码里的context.become,saveSnapshot, 它们分别代表状态转换及一单的终结。

代码语言:javascript复制
  case class TxnItem(
                      txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
                      ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
                      ,opr: String = ""//工号
                      ,num: Int = 0 //销售单号
                      ,seq: Int = 1 //交易序号
                      ,txntype: Int = TXNTYPE.sales//交易类型
                      ,salestype: Int = SALESTYPE.plu //销售类型
                      ,qty: Int =  1 //交易数量
                      ,price: Int = 0 //单价(分)
                      ,amount: Int = 0 //码洋(分)
                      ,dscamt: Int = 0 //折扣:负值  net实洋 = amount   dscamt
                      ,member: String = "" //会员卡号
                      ,code: String = "" //编号(商品、账号...)
                      ,desc: String = "" //项目名称
                      ,dpt: String = ""
                      ,department: String = ""
                      ,cat: String = ""
                      ,category: String = ""
                      ,brd: String = ""
                      ,brand: String = ""
                    )

首先,我们需要这个TxnItem使Read-Actor能把事件恢复成规定格式的交易记录。也可以用这个结构返回给前端用来显示或者打印交易明细。下面是这些Command和Event的定义代码:

代码语言:javascript复制
object Commands {

  case object PassivatePOS    //passivate message
  case class DebugMode(debug: Boolean)

  sealed trait POSCommand {}

  case class LogOn(opr: String, passwd: String) extends POSCommand
  case object LogOff extends POSCommand
  case class SuperOn(su: String, passwd: String) extends POSCommand
  case object SuperOff extends POSCommand
  case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  case object MemberOff extends POSCommand   //remove member status for the voucher
  case object RefundOn extends POSCommand
  case object RefundOff extends POSCommand
  case object VoidOn extends POSCommand
  case object VoidOff extends POSCommand
  case object VoidAll extends POSCommand
  case object Suspend extends POSCommand

  case class VoucherNum(vnum: Int) extends POSCommand


  case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  case object Subtotal extends POSCommand
  case class Discount(code: String, percent: Int) extends POSCommand

  case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand          //settlement   结算支付
 //read only command, no event process
  case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  case class WxPay(acct: String, num: String, amount: Int) extends POSCommand


  // read only command, no update event
  case class Plu(itemCode: String) extends POSCommand  //read only


}


object Events {

  sealed trait POSEvent {}

  case class LogOned(txnItem: TxnItem) extends POSEvent
  object LogOned {
    def apply(op: String, vs: VchStates): LogOned = LogOned(TxnItem(vs).copy(
      txntype = TXNTYPE.logon,
      salestype = SALESTYPE.crd,
      opr = op,
      code = op
    ))
  }
  case class LogOffed(txnItem: TxnItem) extends POSEvent
  object LogOffed {
    def apply(vs: VchStates): LogOffed = LogOffed(TxnItem(vs).copy(
      txntype = TXNTYPE.logon,
      salestype = SALESTYPE.crd,
    ))
  }
  case class SuperOned(txnItem: TxnItem) extends POSEvent
  object SuperOned {
    def apply(su: String, vs: VchStates): SuperOned = SuperOned(TxnItem(vs).copy(
      txntype = TXNTYPE.supon,
      salestype = SALESTYPE.crd,
      code = su
    ))
  }
  case class SuperOffed(txnItem: TxnItem) extends POSEvent
  object SuperOffed {
    def apply(vs: VchStates): SuperOffed = SuperOffed(TxnItem(vs).copy(
      txntype = TXNTYPE.supon,
      salestype = SALESTYPE.crd
    ))
  }
  case class MemberOned(txnItem: TxnItem) extends POSEvent
  object MemberOned {
    def apply(cardnum: String,vs: VchStates): MemberOned = MemberOned(TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.crd,
      member = cardnum
    ))
  }
  case class MemberOffed(txnItem: TxnItem) extends POSEvent   //remove member status for the voucher
  object MemberOffed {
    def apply(vs: VchStates): MemberOffed = MemberOffed(TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.crd,
      member = vs.mbr
    ))
  }
  case class RefundOned(txnItem: TxnItem) extends POSEvent
  object RefundOned {
    def apply(vs: VchStates): RefundOned = RefundOned(TxnItem(vs).copy(
      txntype = TXNTYPE.refund
    ))
  }
  case class RefundOffed(txnItem: TxnItem) extends POSEvent
  object RefundOffed {
    def apply(vs: VchStates): RefundOffed = RefundOffed(TxnItem(vs).copy(
      txntype = TXNTYPE.refund
    ))
  }
  case class VoidOned(txnItem: TxnItem) extends POSEvent
  object VoidOned {
    def apply(vs: VchStates): VoidOned = VoidOned(TxnItem(vs).copy(
      txntype = TXNTYPE.void
    ))
  }
  case class VoidOffed(txnItem: TxnItem) extends POSEvent
  object VoidOffed {
    def apply(vs: VchStates): VoidOffed = VoidOffed(TxnItem(vs).copy(
      txntype = TXNTYPE.void
    ))
  }

  case class NewVoucher(vnum: Int) extends POSEvent     //新单, reminder for read-side to set new vnum
  case class EndVoucher(vnum: Int) extends POSEvent     //单据终结标示
  case class VoidVoucher(txnItem: TxnItem) extends POSEvent
  object VoidVoucher {
    def apply(vs: VchStates): VoidVoucher = VoidVoucher(TxnItem(vs).copy(
      txntype = TXNTYPE.voidall
    ))
  }
  case class SuspVoucher(txnItem: TxnItem) extends POSEvent
  object SuspVoucher {
    def apply(vs: VchStates): SuspVoucher = SuspVoucher(TxnItem(vs).copy(
      txntype = TXNTYPE.suspend
    ))
  }

  case class VoucherNumed(fnum: Int, tnum: Int) extends POSEvent

  case class SalesLogged(txnItem: TxnItem) extends POSEvent
  case class Subtotaled(txnItem: TxnItem) extends POSEvent
  object Subtotaled {
    def apply(vs: VchStates, vi: VchItems): Subtotaled = {
      val (cnt,tqty,tamt,tdsc) = vi.subTotal

      Subtotaled(TxnItem(vs).copy(
        txntype = TXNTYPE.sales,
        salestype = SALESTYPE.sub,
        qty = tqty,
        amount = tamt,
        dscamt = tdsc,
        price = cnt
      ))
    }
  }
  case class Discounted(txnItem: TxnItem) extends POSEvent
  case class Payment(txnItem: TxnItem) extends POSEvent          //settlement   结算支付
  object Payment {
    def apply(acct: String, num: String, vs: VchStates): Payment = Payment(TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.ttl,
      dpt = acct,
      code = num
    ))
  }


}

可以看到,在构建Event的同时也构建了TxnItem结构。

返回前端数据格式如下:

代码语言:javascript复制
object Responses {

  object STATUS {
    val OK: Int = 0
    val FAIL: Int = -1
  }

  case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
}

为了便于debug,我们构建了一个虚拟的数据库环节:

代码语言:javascript复制
package pos.dao

import java.time.LocalDate
import java.time.format.DateTimeFormatter


case class Item(
                 brd: String
                 ,dpt: String
                 ,cat: String
                 ,code: String
                 ,name: String
                 ,price: Int

               )
object Items {
  val apple = Item("01","02","01","001", "green apple", 820)
  val grape = Item("01","02","01","002", "red grape", 1050)
  val orage = Item("01","02","01","003", "sunkist orage", 350)
  val banana = Item("01","02","01","004", "demon banana", 300)
  val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  val peach = Item("01","02","01","006", "xinjiang peach", 2390)

  val tblItems = List(apple, grape, orage, banana, pineapple, peach)

  sealed trait QueryItemsResult {}

  case class QueryItemsOK(items: List[Item]) extends QueryItemsResult

  case class QueryItemsFail(msg: String) extends QueryItemsResult

}


object Codes {
  case class User(code: String, name: String, passwd: String)
  case class Department(code: String, name: String)
  case class Category(code: String, name: String)
  case class Brand(code: String, name: String)
  case class Ra(code: String, name: String)
  case class Account(code: String, name: String)
  case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)

  val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
    ,Account("004","Alipay"),Account("005","WXPay"))

  val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))

  def getDpt(code: String) = dpts.find(d => d.code == code)
  def getCat(code: String) = cats.find(d => d.code == code)
  def getBrd(code: String) = brds.find(b => b.code == code)
  def getAcct(code: String) = accts.find(a => a.code == code)
  def getRa(code: String) = ras.find(a => a.code == code)
}

object DAO {
  import Items._
  import Codes._

  def getItem(code: String): QueryItemsResult = {
    val optItem = tblItems.find(it => it.code == code)
    optItem match {
      case Some(item) => QueryItemsOK(List(item))
      case None => QueryItemsFail("Invalid item code!")
    }
  }

  def validateDpt(code: String) = dpts.find(d => d.code == code)
  def validateCat(code: String) = cats.find(d => d.code == code)
  def validateBrd(code: String) = brds.find(b => b.code == code)
  def validateRa(code: String) = ras.find(ac => ac.code == code)
  def validateAcct(code: String) = accts.find(ac => ac.code == code)

  def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))

  def lastSecOfDateStr(ldate: LocalDate): String = {
    ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd")) " 23:59:59"
  }


}

这个库主要是用来验证和提供TxnItem里的字段值。

好了,回到WriterActor:同样为了方便集群分片监控跟踪和debug,增加了debugMode和重写了persist:

代码语言:javascript复制
class WriterActor extends PersistentActor with LogSupport {
  val cluster = Cluster(context.system)
  // shopdptId-posId
  // self.path.parent.name is the type name (utf-8 URL-encoded)
  // self.path.name is the entry identifier (utf-8 URL-encoded)  but entity has a supervisor
  // override def persistenceId: String = self.path.parent.parent.name   "-"   self.path.parent.name
  override def persistenceId: String = self.path.parent.name

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    log.info(s"Restarting terminal $persistenceId on ${cluster.selfAddress} for $message")
  }

  override def postRestart(reason: Throwable): Unit = {
    super.postRestart(reason)
    log.info(s"terminal $persistenceId on ${cluster.selfAddress} restarted for ${reason.getMessage}")
  }

  override def postStop(): Unit = {
    log.info(s"terminal $persistenceId on ${cluster.selfAddress} stopped!")
  }

  override def preStart(): Unit = {
    log.info(s"terminal $persistenceId on ${cluster.selfAddress} starting...")
  }

  //helper functions
  object RunPOSCommand {
    def unapply(arg: POSCommand) = if (cmdFilter(persistenceId,arg,vchState,vchItems,sender())) Some(arg) else None
  }

  def persistEvent[E](evt: E)(f: E => Unit)(implicit dm: DebugMode) = {
    if (dm.debug)
      log.info(s"********** $persistenceId: persisted event: {$evt} **********")
    else {
      try {
        persist(evt)(f)
        log.debug(s"终端-$persistenceId:event: [$evt] state: [$vchState] : [${vchItems.txnitems.reverse}]")
      }
      catch {
        case err: Throwable =>
          sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 操作失败![${err.getMessage}]。", vchState, vchItems.txnitems.reverse)
          log.error(s"终端-$persistenceId: 操作失败![${err.getMessage}] current state: [$vchState],[${vchItems.txnitems.reverse}]")
      }
    }
  }

  var debugConfig: com.typesafe.config.Config = _
  var debug: Boolean = _
  try {

    debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
    debug = debugConfig.getBoolean("debug")
  }
  catch {
    case _ : Throwable => debug = false
  }

  log.info(s"********** $persistenceId: debug mode = $debug **********")

  implicit val debugMode = DebugMode(debug)

//actor state
  var vchState = VchStates()
  var vchItems = VchItems()


  override def receiveRecover: Receive = {
    case evt: POSEvent =>    //incompleted voucher play back events
      val (vs,vi) =  updateState(evt,vchState,vchItems)
      vchState = vs; vchItems = vi
    case SnapshotOffer(_,vs: VchStates) => vchState = vs  //restore num,seq ...
  }
  
  override def receiveCommand: Receive = logOffState
...
}

Receive函数在WriteActor启动时默认为logOffState

代码语言:javascript复制
  private def logOffState: Receive = {
    case RunPOSCommand(LogOn(opr, _)) =>
          persistEvent(LogOned(opr,vchState)) { evt =>
            val sts = updateState(evt, vchState, vchItems)
//starting seqenceNr for any voucher. no action logged before login
            vchState = sts._1.copy(jseq = lastSequenceNr   1);
            vchItems = sts._2
            sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户{$opr}成功登陆。", vchState, List(vchItems.txnitems.head))
            context.become(logOnState)
          }
      
    case PassivatePOS =>
      log.info(s"**********${persistenceId} got passivate message and stopping  ... ***********")
      context.parent ! PoisonPill

    case _ =>
      sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 用户未登陆。", vchState, List())
  }

除LogOn指令外不接受任何其它指令,一旦接到指令立即通过context.become(logOnState)转状态。passivation只会在这个状态下发挥作用。这也可以理解:即先是没有处理中的单据,又有一段时间的空转,可以passivate了。

在logOnState里可以进行支付操作,如果支付金额小于应付金额则代表部分付款操作,需要转入paymentState:

代码语言:javascript复制
  private def logOnState: Receive = {
...
    //first payment in a voucher
    case RunPOSCommand(OfflinePay(acct,num, amount)) =>
      persistEvent(Payment(acct,num,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
        if (vchItems.totalSales > 0)
          sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
        else
          sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
        if (!vchState.due) {  //completed voucher. mark end of voucher and move next. stay in logOnState
          persistEvent(EndVoucher(vchState.num)) { evt =>
            val sts = updateState(evt, vchState, vchItems)
            vchState = sts._1
            vchItems = sts._2
            saveSnapshot(vchState)     //recovery to next voucher
          }
        }
        else context.become(paymentState)     //switch into paymentState
      }
...
}

支付操作可能出现两种情况:整单完成,存入EndVoucher标示,saveSnapshot,或者转入paymentState。同样,在paymentState状态,只接受支付指令,直到应付金额耗尽,转回logOnState:

代码语言:javascript复制
 private def paymentState: Receive = {
    case RunPOSCommand(OfflinePay(acct,num, amount)) =>
           persistEvent(Payment(acct,num,vchState)) { evt =>
             val sts = updateState(evt, vchState, vchItems)
             vchState = sts._1
             vchItems = sts._2
             if (vchItems.totalSales > 0)
               sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
             else
               sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))

             if (!vchState.due) {  //completed voucher. mark end of voucher and move next. return to logOnState
               persistEvent(EndVoucher(vchState.num)) { evt =>
                 val sts = updateState(evt, vchState, vchItems, lastSeqenceNr)
                 vchState = sts._1
                 vchItems = sts._2
                 saveSnapshot(vchState)     //recovery to next voucher
                 context.unbecome()    //logOnState
               }
             }
             // else wait for other payments and stay in logOnState
           }
 /* strictly disallow any action other than payment till completion
    case RunPOSCommand(VoidAll) =>
      persistEvent(VoidVoucher(vchState)) { _ =>
        persistEvent(EndVoucher(vchState.num)) { evt =>
          updateState(evt,vchState,vchItems)
          context.unbecome()    //in paymentState, switch to logOnState
        }
      }
    case RunPOSCommand(Suspend) =>
      persistEvent(SuspVoucher(vchState)) { _ =>
        persistEvent(EndVoucher(vchState.num)) { evt =>
          updateState(evt,vchState,vchItems)
          context.unbecome()    //in paymentState, switch to logOnState
        }
      }
*/
    case RunPOSCommand(SuperOn(su,_)) =>
      persistEvent(SuperOned(su,vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
      }
    case RunPOSCommand(SuperOff) =>
      persistEvent(SuperOffed(vchState)) { evt =>
        val sts = updateState(evt, vchState, vchItems)
        vchState = sts._1
        vchItems = sts._2
      }

    case RunPOSCommand(VCBalance(acct,num,passwd)) =>
      if (POSInterfaces.validateVC(acct,num,passwd)) {
        val res = POSInterfaces.getVCBalance(acct,num)
        if (res.sts == POSInterfaces.VCRESULT.OK)
          sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 储值卡余额{${res.amt}}。", vchState, List(TxnItem(vchState).copy(
            amount = (res.amt * 100).toInt,
            dpt = acct,
            code = num
          )))
        else
          sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
      } else {
        sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
      }

    case RunPOSCommand(VCPay(acct,num, passwd,amount)) =>
      if (POSInterfaces.validateVC(acct,num,passwd)) {
        val res = POSInterfaces.getVCBalance(acct,num)
        if (res.sts == POSInterfaces.VCRESULT.OK) {
          if ((res.amt * 100).toInt < amount)
            sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额不足!", vchState,List(TxnItem(vchState).copy(
              amount = (res.amt * 100).toInt,
              dpt = acct,
              code = num
            )))
          else {
            val res = POSInterfaces.payByVC(acct,num,amount/100.00)
            if (res.sts == POSInterfaces.VCRESULT.OK) {
              persistEvent(Payment(acct,num,vchState)) { evt =>
                val sts = updateState(evt, vchState, vchItems)
                vchState = sts._1
                vchItems = sts._2
                if (vchItems.totalSales > 0)
                  sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
                else
                  sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
                if (!vchState.due) {  //completed voucher. mark end of voucher and move next. stay in logOnState
                  persistEvent(EndVoucher(vchState.num)) { evt =>
                    val sts = updateState(evt, vchState, vchItems, lastSeqenceNr)
                    vchState = sts._1
                    vchItems = sts._2
                    saveSnapshot(vchState)     //recovery to next voucher
                    context.unbecome()         //switch to logOnState
                  }
                }
              }

            }
          }

        }
        else
          sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
      } else {
        sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
      }
    case _ =>
      sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 结算中不容许其它非支付操作!", vchState, List(vchItems.txnitems.head))

  }

0 人点赞