在分析完saga模式golang源码分析:dtm分布式事务(5),其它模式就是类似的。我们依次看下tcc,msg,workflow和xa
1,tcc
tcc的核心代码位于dtmsvr/trans_type_tcc.go,代码目前并没有完全实现
代码语言:javascript复制registorProcessorCreator("tcc", func(trans *TransGlobal) transProcessor { return &transTccProcessor{TransGlobal: trans} })
GenBranches返回的是空的。
代码语言:javascript复制func (t *transTccProcessor) GenBranches() []TransBranch {
return []TransBranch{}
}
如果已经处理了,就不再处理保证幂等,如果是就绪状态但是超时了,也改成中断状态,然后从前往后依次处理每个分支事务,如果是提交请求,那就提交所有分支,如果是回滚请求,就依次回滚所有分支,如果所有分支都执行成功,将全局状态改为执行成功,否则将全局状态改为失败。
代码语言:javascript复制func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
if !t.needProcess() {
return nil
}
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
}
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpConfirm, dtmimp.OpCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
err := t.execBranch(&branches[current], current)
t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))
执行分支的逻辑位于dtmsvr/trans_status.go
先获取分支目前的状态,然后把当前位置的分支改成同样的状态,修改cron job执行的时间,如果重试超过最大次数发出告警。
代码语言:javascript复制func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
status, err := t.getBranchResult(branch)
t.changeBranchStatus(branch, status, branchPos)
t.touchCronTime(cronKeep, 0)
if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" {
_, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
"gid": t.Gid,
"status": t.Status,
"branch": branch.URL,
"error": err.Error(),
"retry_count": retryCount,
}).Post(conf.AlertWebHook)
代码语言:javascript复制func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
2,msg
消息表事务的源码位于 dtmsvr/trans_type_msg.go
代码语言:javascript复制registorProcessorCreator("msg", func(trans *TransGlobal) transProcessor { return &transMsgProcessor{TransGlobal: trans} })
它将所有step中的消息中的执行url解析到分支事务中。
代码语言:javascript复制func (t *transMsgProcessor) GenBranches() []TransBranch {
for i, step := range t.Steps {
mayTopic := strings.TrimPrefix(step[dtmimp.OpAction], dtmimp.MsgTopicPrefix)
for j, url := range urls {
依次执行每一个分支,执行完毕后修改全局状态为成功状态,如果是异步执行的场景,通过channel来接收执行结果。
代码语言:javascript复制func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
t.mayQueryPrepared()
for i := range branches {
b := &branches[i]
if t.Concurrent {
started
go func(pos int) {
resultsChan <- t.execBranch(b, pos)
}(i)
err = t.execBranch(b, i)
for i := 0; i < started && err == nil; i {
err = <-resultsChan
}
t.changeStatus(dtmcli.StatusSucceed)
执行分支的逻辑和tcc的代码是复用的dtmsvr/trans_status.go
代码语言:javascript复制func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
status, err := t.getBranchResult(branch)
t.changeBranchStatus(branch, status, branchPos)
_, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
"gid": t.Gid,
"status": t.Status,
"branch": branch.URL,
"error": err.Error(),
"retry_count": retryCount,
}).Post(conf.AlertWebHook)
3,workflow
工作流的实现位于dtmsvr/trans_type_workflow.go
代码语言:javascript复制registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} })
这个实现也并不完全
代码语言:javascript复制func (t *transWorkflowProcessor) GenBranches() []TransBranch {
return []TransBranch{}
}
代码语言:javascript复制func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error
cmc := cWorkflowCustom{}
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data)
4,xa
xa的实现位于dtmsvr/trans_type_xa.go
代码语言:javascript复制registorProcessorCreator("xa", func(trans *TransGlobal) transProcessor { return &transXaProcessor{TransGlobal: trans} })
代码语言:javascript复制func (t *transXaProcessor) GenBranches() []TransBranch {
return []TransBranch{}
}
依次执行每一个分支,根据最后分支的状态是成功还是失败来决定是提交还是回滚。
代码语言:javascript复制func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
for i, branch := range branches {
if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
err := t.execBranch(&branch, i)
t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))
执行分支逻辑同上dtmsvr/trans_status.go
代码语言:javascript复制func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {