在介绍完服务端的整体框架后,可以开始saga模式的介绍。saga模式是将一个大事务拆分成几个小的分支事务,然后依次执行每一个事务,如果出现异常,逆序回滚每一个分支事务。核心代码位于dtmsvr/trans_type_saga.go
代码语言:javascript复制func init() {
registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor {
return &transSagaProcessor{TransGlobal: trans}
在init的时候将saga模式注册到全局的processor工厂里面
代码语言:javascript复制var processorFac = map[string]processorCreator{}
processor核心有两个接口
代码语言:javascript复制type transProcessor interface {
GenBranches() []TransBranch
ProcessOnce(branches []TransBranch) error
}
首先看下阐述分支的接口:
代码语言:javascript复制func (t *transSagaProcessor) GenBranches() []TransBranch {
for i, step := range t.Steps {
for _, op := range []string{dtmimp.OpCompensate, dtmimp.OpAction} {
针对分支事务的所有步骤,依次创建补偿分支和执行分支。 然后看下事务处理的接口 ,这里实现了saga模式的核心逻辑。
代码语言:javascript复制func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
1, 如果已经提交并且超时了,修改状态为中断
代码语言:javascript复制 if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting
csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}}
2,然后定义一个对象存储,事务执行过程中的数据。用数组branchResults存储 每一个分支的执行结果,需要注意的是2i 1是action,2i是compensation,这个奇偶位置关系很重要,后面逻辑都是围绕这个顺序展开的
代码语言:javascript复制branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
3,定义了过滤应该执行的分支的函数shouldRun
代码语言:javascript复制if branchResults[pre*2 1].status != dtmcli.StatusSucceed {
return false
4,判断分支是否应该回滚shouldRollback,首先定义了一个子函数rollbacked判断分支是否已经回滚了:
代码语言:javascript复制branchResults[i].status == dtmcli.StatusSucceed || branchResults[i 1].status == dtmcli.StatusPrepared
后面一步必须回滚完成,当前这一步才能回滚
代码语言:javascript复制if !csc.Concurrent && current < n-2 && !rollbacked(current 2) {
return false
}
从后往前找,直到找到第一个没有回滚的分支
代码语言:javascript复制for _, next := range csc.cOrders[current/2] {
if !rollbacked(2 * next) {
return false
}
}
return true
5,定义函数asyncExecBranch异步执行分支,开始执行分支 ,并将执行结果写入通道:
代码语言:javascript复制defer resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error}
err = t.execBranch(&branches[i], i)
6,pickToRunActions找出即将执行的动作列表,注意这里是奇数位置,也就是正向执行的动作列表
代码语言:javascript复制for current := 1; current < n; current = 2 {
br := &branchResults[current]
if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) {
toRun = append(toRun, current)
7,选择需要补偿的动作列表,和上面的函数类似,不过是偶数位置,并且是逆序:
代码语言:javascript复制pickToRunCompensates := func() []int {
for current := n - 2; current >= 0; current -= 2 {
br := &branchResults[current]
if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) {
toRun = append(toRun, current)
8,执行需要执行的分支列表runBranches
代码语言:javascript复制for _, b := range toRun {
go asyncExecBranch(b)
9,等待分支的执行结果waitDoneOnce,根据结果修改分支的状态
代码语言:javascript复制select {
case r := <-resultChan:
go asyncExecBranch(r.index)
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("RetryCount is greater than RetryLimit, RetryLimit: %v", t.RetryLimit)))
10,prepareToCompensate为执行补偿操作做准备,获取正向执行的结果,统计执行成功和失败的分支
代码语言:javascript复制prepareToCompensate := func() {
toRun := pickToRunActions()
for i := 1; i < len(branchResults); i = 2 {
if branchResults[i].started && branchResults[i].status == dtmcli.StatusPrepared {
branchResults[i].status = dtmcli.StatusSucceed
for i, b := range branchResults {
if b.op == dtmimp.OpCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i 1].status != dtmcli.StatusPrepared {
11,执行所有的分支,如果遇到失败或者超时终止
代码语言:javascript复制for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
toRun := pickToRunActions()
runBranches(toRun)
waitDoneOnce()
12,如果没有失败和超时的,修改事务为成功状态
代码语言:javascript复制t.changeStatus(dtmcli.StatusSucceed)
13,如果有失败或者超时的,修改状态为中断
代码语言:javascript复制t.changeStatus(dtmcli.StatusAborting, withRollbackReason(msg))
代码语言:javascript复制t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
14,如果是终止状态,开始执行回滚操作,直到超时
代码语言:javascript复制if t.Status == dtmcli.StatusAborting {
prepareToCompensate()
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
toRun := pickToRunCompensates()
runBranches(toRun)
waitDoneOnce()
15,如果所有的补偿分支都执行成功,修改全局状态为失败
代码语言:javascript复制if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
t.changeStatus(dtmcli.StatusFailed)
以上就是全部流程,中间我们用到的存储状态的结构体定义如下
代码语言:javascript复制type cSagaCustom struct {
Orders map[int][]int `json:"orders"`
Concurrent bool `json:"concurrent"`
cOrders map[int][]int
}
对于saga的客户端,代码定义位于client/dtmcli/trans_saga.go
代码语言:javascript复制type Saga struct {
dtmimp.TransBase
orders map[int][]int
}
它封装了各个操作,比如添加分支
代码语言:javascript复制func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
按照指定顺序添加分支
代码语言:javascript复制func (s *Saga) AddBranchOrder(branch int, preBranches []int) *Saga {
指定当前分支
代码语言:javascript复制func (s *Saga) SetConcurrent() *Saga {
提交分支事务
代码语言:javascript复制 func (s *Saga) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, "submit")
自定义参数
代码语言:javascript复制func (s *Saga) BuildCustomOptions() {
这些操作都是通过http,grpc,grpc-json传递给服务端的client/dtmcli/dtmimp/trans_base.go
代码语言:javascript复制func TransCallDtm(tb *TransBase, operation string) error {
_, err := TransCallDtmExt(tb, tb, operation)
http和jrpc-json定义如下
代码语言:javascript复制func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
if tb.Protocol == Jrpc {
return transCallDtmJrpc(tb, body, operation)
rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
resp, err := rc.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
代码语言:javascript复制func transCallDtmJrpc(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
resp, err := rc.R().
SetBody(map[string]interface{}{
"jsonrpc": "2.0",
"id": "no-use",
"method": operation,
"params": body,
}).
SetResult(&result).
Post(tb.Dtm)
dtmsvr/trans_status.go状态修改,在服务端最终都会落盘的
代码语言:javascript复制func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
t.UpdateTime = &now
GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)