golang源码分析:dtm分布式事务(5)

2023-03-01 16:17:07 浏览数 (2)

在介绍完服务端的整体框架后,可以开始saga模式的介绍。saga模式是将一个大事务拆分成几个小的分支事务,然后依次执行每一个事务,如果出现异常,逆序回滚每一个分支事务。核心代码位于dtmsvr/trans_type_saga.go

代码语言:javascript复制
func init() {
  registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor {
     return &transSagaProcessor{TransGlobal: trans}

在init的时候将saga模式注册到全局的processor工厂里面

代码语言:javascript复制
var processorFac = map[string]processorCreator{}

processor核心有两个接口

代码语言:javascript复制
type transProcessor interface {
  GenBranches() []TransBranch
  ProcessOnce(branches []TransBranch) error
}

首先看下阐述分支的接口:

代码语言:javascript复制
func (t *transSagaProcessor) GenBranches() []TransBranch {
      for i, step := range t.Steps {
        for _, op := range []string{dtmimp.OpCompensate, dtmimp.OpAction} {

针对分支事务的所有步骤,依次创建补偿分支和执行分支。 然后看下事务处理的接口 ,这里实现了saga模式的核心逻辑。

代码语言:javascript复制
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {

1, 如果已经提交并且超时了,修改状态为中断

代码语言:javascript复制
 if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
       t.changeStatus(dtmcli.StatusAborting
   csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}}

2,然后定义一个对象存储,事务执行过程中的数据。用数组branchResults存储 每一个分支的执行结果,需要注意的是2i 1是action,2i是compensation,这个奇偶位置关系很重要,后面逻辑都是围绕这个顺序展开的

代码语言:javascript复制
branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op}

3,定义了过滤应该执行的分支的函数shouldRun

代码语言:javascript复制
if branchResults[pre*2 1].status != dtmcli.StatusSucceed {
        return false

4,判断分支是否应该回滚shouldRollback,首先定义了一个子函数rollbacked判断分支是否已经回滚了:

代码语言:javascript复制
branchResults[i].status == dtmcli.StatusSucceed || branchResults[i 1].status == dtmcli.StatusPrepared

后面一步必须回滚完成,当前这一步才能回滚

代码语言:javascript复制
if !csc.Concurrent && current < n-2 && !rollbacked(current 2) {
      return false
    }

从后往前找,直到找到第一个没有回滚的分支

代码语言:javascript复制
for _, next := range csc.cOrders[current/2] {
      if !rollbacked(2 * next) {
        return false
      }
    }
    return true

5,定义函数asyncExecBranch异步执行分支,开始执行分支 ,并将执行结果写入通道:

代码语言:javascript复制
defer resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error}
 err = t.execBranch(&branches[i], i)

6,pickToRunActions找出即将执行的动作列表,注意这里是奇数位置,也就是正向执行的动作列表

代码语言:javascript复制
for current := 1; current < n; current  = 2 {
      br := &branchResults[current]
      if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) {
        toRun = append(toRun, current)

7,选择需要补偿的动作列表,和上面的函数类似,不过是偶数位置,并且是逆序:

代码语言:javascript复制
pickToRunCompensates := func() []int {
  for current := n - 2; current >= 0; current -= 2 {
      br := &branchResults[current]
      if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) {
        toRun = append(toRun, current)

8,执行需要执行的分支列表runBranches

代码语言:javascript复制
for _, b := range toRun {
          go asyncExecBranch(b)

9,等待分支的执行结果waitDoneOnce,根据结果修改分支的状态

代码语言:javascript复制
select {
    case r := <-resultChan:
          go asyncExecBranch(r.index)
          t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("RetryCount is greater than RetryLimit, RetryLimit: %v", t.RetryLimit)))

10,prepareToCompensate为执行补偿操作做准备,获取正向执行的结果,统计执行成功和失败的分支

代码语言:javascript复制
prepareToCompensate := func() {
    toRun := pickToRunActions()
    for i := 1; i < len(branchResults); i  = 2 {
      if branchResults[i].started && branchResults[i].status == dtmcli.StatusPrepared {
      branchResults[i].status = dtmcli.StatusSucceed
    for i, b := range branchResults {
      if b.op == dtmimp.OpCompensate && b.status != dtmcli.StatusSucceed &&
        branchResults[i 1].status != dtmcli.StatusPrepared {

11,执行所有的分支,如果遇到失败或者超时终止

代码语言:javascript复制
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
        toRun := pickToRunActions()
        runBranches(toRun)
        waitDoneOnce()

12,如果没有失败和超时的,修改事务为成功状态

代码语言:javascript复制
t.changeStatus(dtmcli.StatusSucceed)

13,如果有失败或者超时的,修改状态为中断

代码语言:javascript复制
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(msg))
代码语言:javascript复制
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

14,如果是终止状态,开始执行回滚操作,直到超时

代码语言:javascript复制
if t.Status == dtmcli.StatusAborting {
    prepareToCompensate()
      for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
        toRun := pickToRunCompensates()
        runBranches(toRun)
        waitDoneOnce()

15,如果所有的补偿分支都执行成功,修改全局状态为失败

代码语言:javascript复制
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
    t.changeStatus(dtmcli.StatusFailed)

以上就是全部流程,中间我们用到的存储状态的结构体定义如下

代码语言:javascript复制
type cSagaCustom struct {
  Orders     map[int][]int `json:"orders"`
  Concurrent bool          `json:"concurrent"`
  cOrders    map[int][]int
}

对于saga的客户端,代码定义位于client/dtmcli/trans_saga.go

代码语言:javascript复制
type Saga struct {
  dtmimp.TransBase
  orders map[int][]int
}

它封装了各个操作,比如添加分支

代码语言:javascript复制
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {

按照指定顺序添加分支

代码语言:javascript复制
func (s *Saga) AddBranchOrder(branch int, preBranches []int) *Saga {

指定当前分支

代码语言:javascript复制
func (s *Saga) SetConcurrent() *Saga {

提交分支事务

代码语言:javascript复制
  func (s *Saga) Submit() error {
      return dtmimp.TransCallDtm(&s.TransBase, "submit")

自定义参数

代码语言:javascript复制
func (s *Saga) BuildCustomOptions() {

这些操作都是通过http,grpc,grpc-json传递给服务端的client/dtmcli/dtmimp/trans_base.go

代码语言:javascript复制
func TransCallDtm(tb *TransBase, operation string) error {
      _, err := TransCallDtmExt(tb, tb, operation)

http和jrpc-json定义如下

代码语言:javascript复制
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
   if tb.Protocol == Jrpc {
    return transCallDtmJrpc(tb, body, operation)
  rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
  resp, err := rc.R().
    SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
代码语言:javascript复制
func transCallDtmJrpc(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
      rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
      resp, err := rc.R().
    SetBody(map[string]interface{}{
      "jsonrpc": "2.0",
      "id":      "no-use",
      "method":  operation,
      "params":  body,
    }).
    SetResult(&result).
    Post(tb.Dtm)

dtmsvr/trans_status.go状态修改,在服务端最终都会落盘的

代码语言:javascript复制
func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
      t.UpdateTime = &now
  GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)

0 人点赞