在分析了qs的大致源码后golang源码分析:dtm分布式事务(1),我们分析下dtm-example的源码结构,每个例子都是类似的。
先看下main.go里面的main函数
代码语言:javascript复制func main() {
hintExit("")
busi.BusiConf = dtmimp.DBConf
busi.ResetXaData()
app, gsvr := busi.Startup()
examples.AddRoutes(app)
if cmd == "qs"
go busi.RunHTTP(app)
busi.QsMain()
examples.IsExists(cmd)
如果没有知名具体实例参数,列出参数列表
代码语言:javascript复制func hintExit(msg string) {
_, cmd := range examples.Commands
然后是将数据库里未提交的事务回滚掉,learn/dtm/dtm-examples/busi/utils.go
代码语言:javascript复制func ResetXaData() {
db.Must().Raw("xa recover").Scan(&xas)
for _, xa := range xas {
db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
}
其中数据库连接参数定义在
busi/base_types.go
代码语言:javascript复制var StoreHost = "localhost"
var BusiConf = dtmcli.DBConf{
Driver: "mysql",
Host: StoreHost,
Port: 3306,
User: "root",
}
获取db连接dtmutil/db.go
代码语言:javascript复制func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {
dsn := dtmimp.GetDsn(conf)
db, ok := dbs.Load(dsn)
db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
SkipDefaultTransaction: true,
})
dbs.Store(dsn, db)
然后启动业务服务,对外提供服务busi/startup.go
代码语言:javascript复制func Startup() (*gin.Engine, *grpc.Server) {
svr := GrpcStartup()
app := BaseAppStartup()
其中grpc服务
代码语言:javascript复制func GrpcStartup() *grpc.Server {
conn, err := grpc.Dial(dtmutil.DefaultGrpcServer
DtmClient = dtmgpb.NewDtmClient(conn)
conn1, err := grpc.Dial(BusiGrpc,
BusiCli = NewBusiClient(conn1)
s := grpc.NewServer(
RegisterBusiServer(s, &busiServer{})
服务实现位于busi/busi_grpc.pb.go
代码语言:javascript复制type busiServer struct {
UnimplementedBusiServer
}
对外提供了所有业务接口
代码语言:javascript复制 func (s *busiServer) QueryPrepared
func (s *busiServer) TransIn(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOut(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransInRevert(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutRevert(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransInConfirm(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutConfirm(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransInTcc(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutTcc(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransInXa(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutXa(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransInTccNested(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *ReqGrpc)
func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *ReqGrpc)
然后是注册http服务,busi/base_http.go
代码语言:javascript复制 func BaseAppStartup() *gin.Engine {
app := dtmutil.GetGinApp()
app.Use(func(c *gin.Context) {
v := MainSwitch.NextResult.Fetch()
BaseAddRoute(app)
addJrpcRoute(app)
for k, v := range setupFuncs {
v(app)
其中,注册http路由和json rpc路由的实现如下
代码语言:javascript复制func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI "/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{}
busi/base_jrpc.go
代码语言:javascript复制func addJrpcRoute(app *gin.Engine) {
app.POST("/api/json-rpc", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
代码语言:javascript复制var setupFuncs = map[string]setupFunc{}
注册了barrier,busi/barrier.go
代码语言:javascript复制 func init() {
setupFuncs["BarrierSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI "/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
barrier := MustBarrierFromGin(c)
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
return BarrierFrom(dtmimp.EscapeGet(qs, "trans_type"), dtmimp.EscapeGet(qs, "gid"), dtmimp.EscapeGet(qs, "branch_id"), dtmimp.EscapeGet(qs, "op"))
return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
代码语言:javascript复制func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
tx, err := db.Begin()
接着是注册路由examples/startup.go
代码语言:javascript复制 var routes = []PostRoute{}
func AddRoutes(app *gin.Engine) {
for _, r := range routes {
app.POST(r.Route, dtmutil.WrapHandler(r.Handler))
对于例子"qs"
代码语言:javascript复制 go busi.RunHTTP(app)
time.Sleep(200 * time.Millisecond)
busi.QsMain()
代码位于busi/base_http.go,仅仅启动了http服务进行监听
代码语言:javascript复制 func RunHTTP(app *gin.Engine) {
err := app.Run(fmt.Sprintf(":%d", BusiPort))
busi/quick_start.go 启动了 QsStartSvr()供dtm回调用
代码语言:javascript复制func QsMain() {
QsStartSvr()
QsFireRequest()
select {}
}
代码语言:javascript复制func QsStartSvr() {
app := gin.New()
qsAddRoute(app)
log.Printf("quick start examples listening at %d", qsBusiPort)
go func() {
_ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
代码语言:javascript复制func qsAddRoute(app *gin.Engine) {
app.POST(qsBusiAPI "/TransIn", func(c *gin.Context) {
然后就是触发我们的业务请求,将我们的参数和回调地址注册给dtm,dtmutil/consts.go
代码语言:javascript复制func QsFireRequest() string {
req := &gin.H{"amount": 30}
saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).
Add(qsBusi "/TransOut", qsBusi "/TransOutCompensate", req).
Add(qsBusi "/TransIn", qsBusi "/TransInCompensate", req)
err := saga.Submit()
代码语言:javascript复制 DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
DefaultJrpcServer = "http://localhost:36789/api/json-rpc"
DefaultGrpcServer = "localhost:36790"
分析完qs例子后,其他例子是类似的,对于grpc请求,会初始化grpc workflow
代码语言:javascript复制workflow.InitGrpc(dtmutil.DefaultGrpcServer, busi.BusiGrpc, gsvr)
busi.BusiCli = busi.NewBusiClient(conn1)
http请求,对应的是http workflow
代码语言:javascript复制workflow.InitHTTP(dtmutil.DefaultHTTPServer, busi.Busi "/workflow/resume")
然后分别启动grpc回调监听,http回调监听,最后通过Call触发请求
代码语言:javascript复制 go busi.RunGrpc(gsvr)
go busi.RunHTTP(app)
examples.Call(cmd)
对于每一个例子是如何注册进去的呢?examples/startup.go
代码语言:javascript复制func AddCommand(name string, fn func() string) {
Commands = append(Commands, commandInfo{Arg: name, Action: fn})
其中的Action,就是注册的时候注册的执行方法,在Call方法里面会被调用:
代码语言:javascript复制func Call(name string) {
c.Action()
例子的注入时机在init函数中Commands,以xa为例:examples/http_xa.go
代码语言:javascript复制func init() {
AddCommand("http_xa", func() string {
err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
resp, err := xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi "/TransOutXa")
return xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi "/TransInXa")