golang源码分析:dtm分布式事务(6)

2023-03-01 16:17:25 浏览数 (1)

在分析完saga模式golang源码分析:dtm分布式事务(5),其它模式就是类似的。我们依次看下tcc,msg,workflow和xa

1,tcc

tcc的核心代码位于dtmsvr/trans_type_tcc.go,代码目前并没有完全实现

代码语言:javascript复制
registorProcessorCreator("tcc", func(trans *TransGlobal) transProcessor { return &transTccProcessor{TransGlobal: trans} })

GenBranches返回的是空的。

代码语言:javascript复制
func (t *transTccProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}

如果已经处理了,就不再处理保证幂等,如果是就绪状态但是超时了,也改成中断状态,然后从前往后依次处理每个分支事务,如果是提交请求,那就提交所有分支,如果是回滚请求,就依次回滚所有分支,如果所有分支都执行成功,将全局状态改为执行成功,否则将全局状态改为失败。

代码语言:javascript复制
func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
      if !t.needProcess() {
    return nil
  }
      if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
  }
      op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpConfirm, dtmimp.OpCancel).(string)
    for current := len(branches) - 1; current >= 0; current-- {
      if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
        err := t.execBranch(&branches[current], current)
    t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))

执行分支的逻辑位于dtmsvr/trans_status.go

先获取分支目前的状态,然后把当前位置的分支改成同样的状态,修改cron job执行的时间,如果重试超过最大次数发出告警。

代码语言:javascript复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
      status, err := t.getBranchResult(branch)
      t.changeBranchStatus(branch, status, branchPos)
      t.touchCronTime(cronKeep, 0)
          if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" {
      _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
        "gid":         t.Gid,
        "status":      t.Status,
        "branch":      branch.URL,
        "error":       err.Error(),
        "retry_count": retryCount,
      }).Post(conf.AlertWebHook)
代码语言:javascript复制
func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
      GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)

2,msg

消息表事务的源码位于 dtmsvr/trans_type_msg.go

代码语言:javascript复制
registorProcessorCreator("msg", func(trans *TransGlobal) transProcessor { return &transMsgProcessor{TransGlobal: trans} })

它将所有step中的消息中的执行url解析到分支事务中。

代码语言:javascript复制
func (t *transMsgProcessor) GenBranches() []TransBranch {
    for i, step := range t.Steps {
      mayTopic := strings.TrimPrefix(step[dtmimp.OpAction], dtmimp.MsgTopicPrefix)
      for j, url := range urls {

依次执行每一个分支,执行完毕后修改全局状态为成功状态,如果是异步执行的场景,通过channel来接收执行结果。

代码语言:javascript复制
func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
    t.mayQueryPrepared()
    for i := range branches {
      b := &branches[i]
          if t.Concurrent {
      started  
      go func(pos int) {
        resultsChan <- t.execBranch(b, pos)
      }(i)
      err = t.execBranch(b, i)
      for i := 0; i < started && err == nil; i   {
    err = <-resultsChan
  }
    t.changeStatus(dtmcli.StatusSucceed)

执行分支的逻辑和tcc的代码是复用的dtmsvr/trans_status.go

代码语言:javascript复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
      status, err := t.getBranchResult(branch)
      t.changeBranchStatus(branch, status, branchPos)
            _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
        "gid":         t.Gid,
        "status":      t.Status,
        "branch":      branch.URL,
        "error":       err.Error(),
        "retry_count": retryCount,
      }).Post(conf.AlertWebHook)

3,workflow

工作流的实现位于dtmsvr/trans_type_workflow.go

代码语言:javascript复制
registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} })

这个实现也并不完全

代码语言:javascript复制
func (t *transWorkflowProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}
代码语言:javascript复制
func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error 
    cmc := cWorkflowCustom{}
    dtmimp.MustUnmarshalString(t.CustomData, &cmc)
    return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data)

4,xa

xa的实现位于dtmsvr/trans_type_xa.go

代码语言:javascript复制
registorProcessorCreator("xa", func(trans *TransGlobal) transProcessor { return &transXaProcessor{TransGlobal: trans} })
代码语言:javascript复制
func (t *transXaProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}

依次执行每一个分支,根据最后分支的状态是成功还是失败来决定是提交还是回滚。

代码语言:javascript复制
func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error {
    if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
    for i, branch := range branches {
      if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
      err := t.execBranch(&branch, i)
    t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))

执行分支逻辑同上dtmsvr/trans_status.go

代码语言:javascript复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
ios

0 人点赞