golang源码分析:dtm分布式事务(2)

2023-03-01 16:16:03 浏览数 (2)

在分析了qs的大致源码后golang源码分析:dtm分布式事务(1),我们分析下dtm-example的源码结构,每个例子都是类似的。

先看下main.go里面的main函数

代码语言:javascript复制
func main() {
    hintExit("")
      
    busi.BusiConf = dtmimp.DBConf
    busi.ResetXaData()
    app, gsvr := busi.Startup()
    examples.AddRoutes(app)
    if cmd == "qs"
       go busi.RunHTTP(app)
       busi.QsMain()
    examples.IsExists(cmd)

如果没有知名具体实例参数,列出参数列表

代码语言:javascript复制
func hintExit(msg string) {
   _, cmd := range examples.Commands

然后是将数据库里未提交的事务回滚掉,learn/dtm/dtm-examples/busi/utils.go

代码语言:javascript复制
func ResetXaData() {
  db.Must().Raw("xa recover").Scan(&xas)
  for _, xa := range xas {
    db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
  }

其中数据库连接参数定义在

busi/base_types.go

代码语言:javascript复制
var StoreHost = "localhost"
    var BusiConf = dtmcli.DBConf{
  Driver: "mysql",
  Host:   StoreHost,
  Port:   3306,
  User:   "root",
}

获取db连接dtmutil/db.go

代码语言:javascript复制
func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {
      dsn := dtmimp.GetDsn(conf)
      db, ok := dbs.Load(dsn)
      db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
      SkipDefaultTransaction: true,
    })
      dbs.Store(dsn, db)

然后启动业务服务,对外提供服务busi/startup.go

代码语言:javascript复制
func Startup() (*gin.Engine, *grpc.Server) {
      svr := GrpcStartup()
      app := BaseAppStartup()

其中grpc服务

代码语言:javascript复制
func GrpcStartup() *grpc.Server {
      conn, err := grpc.Dial(dtmutil.DefaultGrpcServer
      DtmClient = dtmgpb.NewDtmClient(conn)
      conn1, err := grpc.Dial(BusiGrpc,
      BusiCli = NewBusiClient(conn1)
      s := grpc.NewServer(
      RegisterBusiServer(s, &busiServer{})

服务实现位于busi/busi_grpc.pb.go

代码语言:javascript复制
type busiServer struct {
  UnimplementedBusiServer
}

对外提供了所有业务接口

代码语言:javascript复制
      func (s *busiServer) QueryPrepared
      func (s *busiServer) TransIn(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOut(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInRevert(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutRevert(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInConfirm(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutConfirm(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInTcc(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutTcc(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInXa(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOutXa(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInTccNested(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *ReqGrpc)

然后是注册http服务,busi/base_http.go

代码语言:javascript复制
    func BaseAppStartup() *gin.Engine {
      app := dtmutil.GetGinApp()
      app.Use(func(c *gin.Context) {
      v := MainSwitch.NextResult.Fetch()
      BaseAddRoute(app)
      addJrpcRoute(app)
      for k, v := range setupFuncs {
        v(app)

其中,注册http路由和json rpc路由的实现如下

代码语言:javascript复制
func BaseAddRoute(app *gin.Engine) {
      app.POST(BusiAPI "/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{}

busi/base_jrpc.go

代码语言:javascript复制
func addJrpcRoute(app *gin.Engine) {
  app.POST("/api/json-rpc", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
代码语言:javascript复制
var setupFuncs = map[string]setupFunc{}

注册了barrier,busi/barrier.go

代码语言:javascript复制
  func init() {
      setupFuncs["BarrierSetup"] = func(app *gin.Engine) {
        app.POST(BusiAPI "/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
          barrier := MustBarrierFromGin(c)
            ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
              return BarrierFrom(dtmimp.EscapeGet(qs, "trans_type"), dtmimp.EscapeGet(qs, "gid"), dtmimp.EscapeGet(qs, "branch_id"), dtmimp.EscapeGet(qs, "op"))
          return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
            return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
代码语言:javascript复制
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
              tx, err := db.Begin()

接着是注册路由examples/startup.go

代码语言:javascript复制
 var routes = []PostRoute{}
 func AddRoutes(app *gin.Engine) {
      for _, r := range routes {
       app.POST(r.Route, dtmutil.WrapHandler(r.Handler))

对于例子"qs"

代码语言:javascript复制
    go busi.RunHTTP(app)
    time.Sleep(200 * time.Millisecond)
    busi.QsMain()

代码位于busi/base_http.go,仅仅启动了http服务进行监听

代码语言:javascript复制
    func RunHTTP(app *gin.Engine) {
      err := app.Run(fmt.Sprintf(":%d", BusiPort))

busi/quick_start.go 启动了 QsStartSvr()供dtm回调用

代码语言:javascript复制
func QsMain() {
  QsStartSvr()
  QsFireRequest()
  select {}
}
代码语言:javascript复制
func QsStartSvr() {
  app := gin.New()
  qsAddRoute(app)
  log.Printf("quick start examples listening at %d", qsBusiPort)
  go func() {
    _ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
代码语言:javascript复制
func qsAddRoute(app *gin.Engine) {
  app.POST(qsBusiAPI "/TransIn", func(c *gin.Context) {

然后就是触发我们的业务请求,将我们的参数和回调地址注册给dtm,dtmutil/consts.go

代码语言:javascript复制
func QsFireRequest() string {
        req := &gin.H{"amount": 30} 
        saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).
        Add(qsBusi "/TransOut", qsBusi "/TransOutCompensate", req).
        Add(qsBusi "/TransIn", qsBusi "/TransInCompensate", req)
        err := saga.Submit()
代码语言:javascript复制
    DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
    DefaultJrpcServer = "http://localhost:36789/api/json-rpc"
    DefaultGrpcServer = "localhost:36790"

分析完qs例子后,其他例子是类似的,对于grpc请求,会初始化grpc workflow

代码语言:javascript复制
workflow.InitGrpc(dtmutil.DefaultGrpcServer, busi.BusiGrpc, gsvr)
busi.BusiCli = busi.NewBusiClient(conn1)

http请求,对应的是http workflow

代码语言:javascript复制
workflow.InitHTTP(dtmutil.DefaultHTTPServer, busi.Busi "/workflow/resume")

然后分别启动grpc回调监听,http回调监听,最后通过Call触发请求

代码语言:javascript复制
      go busi.RunGrpc(gsvr)
      go busi.RunHTTP(app)
      examples.Call(cmd)

对于每一个例子是如何注册进去的呢?examples/startup.go

代码语言:javascript复制
func AddCommand(name string, fn func() string) {
      Commands = append(Commands, commandInfo{Arg: name, Action: fn})

其中的Action,就是注册的时候注册的执行方法,在Call方法里面会被调用:

代码语言:javascript复制
func Call(name string) {
      c.Action()

例子的注入时机在init函数中Commands,以xa为例:examples/http_xa.go

代码语言:javascript复制
func init() {
      AddCommand("http_xa", func() string {
        err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
        resp, err := xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi "/TransOutXa")
        return xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi "/TransInXa")

0 人点赞