golang源码分析:dtm分布式事务(1)

2023-03-01 16:15:42 浏览数 (1)

https://github.com/dtm-labs/dtm和seata类似是一个分布式事务管理器,不过是golang实现的,它有丰富的例子可以供我们学习https://github.com/dtm-labs/dtm-examples。常见的事务模式,支持对比如下:

1,TCC事务:dtm和Seata都支持了TCC事务。

2,XA事务:dtm和Seata都支持XA事务。dtm采用的是回调函数形式的接口,而Seata采用的是Java特有的注解形式接口,本质都是回调。

3,AT事务:AT事务是Seata独有的事务模式(类似XA,性能更高,但有脏回滚)

4,SAGA事务:Seata的Saga实现采用了状态机,优点是可以做到灵活配置,缺点是上手难度非常高。dtm支持并发的Saga。

5,二阶段消息:dtm支持了二阶段消息事务模式,该模式受到RocketMQ的事务消息启发。提供了比本地消息表和事务消息更简单的架构,更易用的接口:PrepareAndSubmit,适用于无需回滚的数据一致性场景

总的来说,对于golang用户学习分布式事务是一个非常不错的选择。在学习本篇之前,建议先学习下mysql的XAgolang源码分析:golang使用mysql XA事务,然后会发现大家的最终方案都是相似的。也可以对比seata的golang客户端来学习golang源码分析:seata-go (1)at模式,golang源码分析:seata-go (2)tcc模式。

dtm在传输协议上支持三种,grpc,http和http-json,它的通信链路大概可以分为三条:

1,对外提供服务的链路。

2,ap(应用程序)调用dtm(事务管理器)上报数据的链路

3,dtm回调应用程序的链路。

下面,我们基于dtm-examples的qs例子源码对dtm进行简单介绍:

1,对外提供的服务接口是基于gin http服务实现的,它的端口是8081

代码语言:javascript复制
go busi.RunHTTP(app)  
代码语言:javascript复制
const (
  // BusiAPI busi api prefix
  BusiAPI = "/api/busi"
  // BusiPort busi server port
  BusiPort = 8081
  // BusiGrpcPort busi server port
  BusiGrpcPort = 58081
)

提供了转入,转出等多个接口。

代码语言:javascript复制
func BaseAddRoute(app *gin.Engine) {
  app.POST(BusiAPI "/workflow/resume"
  app.POST(BusiAPI "/TransIn",

2,上报数据链路,qs例子,实现了saga模式

代码语言:javascript复制
saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).

首先通过uuid产生了全局事务id,然后组装回调需要的接口和参数,然后上报给dtm服务器,地址是http://localhost:36789/api/dtmsvr/submit ,也可以是grpc模式

代码语言:javascript复制
 const dtmServer = "http://localhost:36789/api/dtmsvr"    

对应实现在github.com/dtm-labs/client@v1.15.1/dtmcli/trans_saga.go

代码语言:javascript复制
func NewSaga(server string, gid string) *Saga {
        &Saga{TransBase: *dtmimp.NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}}

github.com/dtm-labs/client@v1.15.1/dtmcli/dtmimp/trans_base.go

代码语言:javascript复制
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
    return &TransBase{
    Gid:          gid,
    TransType:    transType,
    BranchIDGen:  BranchIDGen{BranchID: branchID},
    Dtm:          dtm,
    TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
    Context:      context.Background(),
  }

然后调用Add方法添加需要上报的信息

代码语言:javascript复制
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
  s.Steps = append(s.Steps, map[string]string{"action": action, "compensate": compensate})
  s.Payloads = append(s.Payloads, dtmimp.MustMarshalString(postData))

最后通过submit提交给dtm

代码语言:javascript复制
func (s *Saga) Submit() error {
      s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, "submit")
代码语言:javascript复制
// TransCallDtm is the short call for TransCallDtmExt
func TransCallDtm(tb *TransBase, operation string) error {
  _, err := TransCallDtmExt(tb, tb, operation)
代码语言:javascript复制
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
   resp, err := RestyClient.R().
    SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))

3,dtm回调业务的接口也是基于gin的http服务实现的,端口是8082

代码语言:javascript复制
// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8082
var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)

在启动1中的gin http服务后,会调用dtm的上报接口,将回调信息上报给dtm,其中就包括dtm 回调用的接口,详细信息如下:

代码语言:javascript复制
[
        {
            "action": "http://localhost:8082/api/busi_start/TransOut",
            "compensate": "http://localhost:8082/api/busi_start/TransOutCompensate"
        },
        {
            "action": "http://localhost:8082/api/busi_start/TransIn",
            "compensate": "http://localhost:8082/api/busi_start/TransInCompensate"
        }
],
    "payloads": ["{mount:30}","{mount:30}"
    ]

对于qs实例来说,就包括两个分支事务的正向操作接口和对应的补偿接口,具体实现在qsAddRoute方法里面:

代码语言:javascript复制
func qsAddRoute(app *gin.Engine) {
  app.POST(qsBusiAPI "/TransIn",

4,在dtm的server端提供了相应的submit接口,接收客户端也就是应用程序提交的submit请求,对应路由注册在dtmsvr/api_http.go

代码语言:javascript复制
func addRoute(engine *gin.Engine) {
    engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
    engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
    engine.POST("/api/dtmsvr/submit", dtmutil.WrapHandler2(submit))
    engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))
    engine.POST("/api/dtmsvr/registerBranch", dtmutil.WrapHandler2(registerBranch))

实现位于dtmsvr/api.go

代码语言:javascript复制
func svcSubmit(t *TransGlobal) interface{} 
代码语言:javascript复制
func submit(c *gin.Context) interface{} {
  return svcSubmit(TransFromContext(c))

下面我们启动官方的例子跑一下:

首先启动dtm 服务端

代码语言:javascript复制
cd  dtm 
go run main.go

然后运行例子中的qs例子

代码语言:javascript复制
cd dtm-examples 
go run main.go qs

可以看到下面的日志

代码语言:javascript复制
{"level":"info","ts":"2022-12-18T18:29:24.433 0800","caller":"dtmutil/db.go:103","msg":"connecting 'mysql' 'en.dtm.pub' 'dtm' '3306' ''"}
{"level":"debug","ts":"2022-12-18T18:29:24.713 0800","caller":"dtmutil/db.go:79","msg":"installing db plugin: tracePlugin"}
{"level":"debug","ts":"2022-12-18T18:29:24.748 0800","caller":"dtmutil/db.go:68","msg":"used: 34 ms affected: -1 sql is: xa recover"}
{"level":"debug","ts":"2022-12-18T18:29:24.748 0800","caller":"busi/base_grpc.go:57","msg":"dtm client inited"}
{"level":"info","ts":"2022-12-18T18:29:24.748 0800","caller":"busi/base_http.go:54","msg":"examples starting"}
{"level":"debug","ts":"2022-12-18T18:29:24.748 0800","caller":"busi/base_http.go:69","msg":"initing BarrierSetup"}
{"level":"debug","ts":"2022-12-18T18:29:24.949 0800","caller":"busi/base_http.go:77","msg":"Starting busi at: 8081"}
2022/12/18 18:29:25 quick start examples listening at 8082
{"level":"debug","ts":"2022-12-18T18:29:25.252 0800","caller":"dtmimp/vars.go:46","msg":"requesting: POST http://localhost:36789/api/dtmsvr/submit {"gid":"QHEbkW3zWVwFEpWuAdRwQK","trans_type":"saga","concurrent":false,"steps":[{"action":"http://localhost:8082/api/busi_start/TransOut","compensate":"http://localhost:8082/api/busi_start/TransOutCompensate"},{"action":"http://localhost:8082/api/busi_start/TransIn","compensate":"http://localhost:8082/api/busi_start/TransInCompensate"}],"payloads":["{\"amount\":30}","{\"amount\":30}"],"protocol":""} resolved: http://localhost:36789/api/dtmsvr/submit"}
{"level":"debug","ts":"2022-12-18T18:29:25.294 0800","caller":"dtmimp/vars.go:54","msg":"requested: 200 POST http://localhost:36789/api/dtmsvr/submit {"dtm_result":"SUCCESS"}"}
2022/12/18 18:29:25 TransOut
2022/12/18 18:29:25 TransIn
2022/12/18 18:29:25 TransInCompensate
2022/12/18 18:29:25 TransOutCompensate

当然,我们也可以请求下,我们本地服务提供的http服务

代码语言:javascript复制
% curl -iv -X POST http://127.0.0.1:8081/api/busi/TransIn -d '{"amount":30}'
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 127.0.0.1:8081...
* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)
> POST /api/busi/TransIn HTTP/1.1
> Host: 127.0.0.1:8081
> User-Agent: curl/7.79.1
> Accept: */*
> Content-Length: 13
> Content-Type: application/x-www-form-urlencoded
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Content-Type: application/json; charset=utf-8
Content-Type: application/json; charset=utf-8
< Date: Sun, 18 Dec 2022 10:38:13 GMT
Date: Sun, 18 Dec 2022 10:38:13 GMT
< Content-Length: 4
Content-Length: 4

<
* Connection #0 to host 127.0.0.1 left intact
nul

0 人点赞