golang源码分析:dtm分布式事务(4)

2023-03-01 16:16:43 浏览数 (1)

我们继续上一篇golang源码分析:dtm分布式事务(3)分析api服务的源码,位置位于dtmsvr/svr.go:

代码语言:javascript复制
  func StartSvr() *gin.Engine {
   dtmcli.GetRestyClient().SetTimeout
    app := dtmutil.GetGinApp()
    app = httpMetrics(app)
    addRoute(app)
    addJrpcRouter(app)
  
  go func() {
    err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
  dtmgpb.RegisterDtmServer(s, &dtmServer{})
  
  go func() {
    err := s.Serve(lis)
  for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i   {
     go updateBranchAsync()
  updateTopicsMap()
  go CronUpdateTopicsMap()
  
  err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)

首先启动一个gin服务器,然后注册一个grpcserver :dtmServer,然后通过一个协程启动grpc服务,然后启动协程,每200ms一次将分支信息同步到存储。然后检查下kv存储里"topics"的值的版本,存储到topicsMap,紧接着启动一个协程任务在后台执行上面的kv存储到内存的更新。最后把我们的server注册到服务发现,一般是通过环境变量控制的

代码语言:javascript复制
#   Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
#   EndPoint: 'localhost:36790'

总结下就5件事

1,启动http服务

2,启动grpc服务

3,将分支的更新同步到存储

4,将kc里面存储的topics数据同步到内存。

5,将服务注册到服务发现注册中心。

其中,启动http服务包括三部分

A,为监控注册路由

B,为http服务注册路由

C,为json-rpc注册路由

1, 其中http路由就是简单的gin路由注册,位于dtmsvr/api_http.go

代码语言:javascript复制
func addRoute(engine *gin.Engine) {
    engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
    engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
    engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))

提供了常见申请全局事务ID,prepare,abort,commit等事务执行动作。

代码语言:javascript复制
func prepare(c *gin.Context) interface{} {
  return svcPrepare(TransFromContext(c))
}

其中prepare需要获取全局事务的分支事务

代码语言:javascript复制
dbt := GetTransGlobal(t.Gid)
代码语言:javascript复制
trans := GetStore().FindTransGlobalStore(gid)

默认这些信息存储在boltdb里面dtmsvr/storage/boltdb/boltdb.go,可以看下获取全局存储的过程

代码语言:javascript复制
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
     trans = tGetGlobal(t, gid)
     bs := t.Bucket(bucketGlobal).Get([]byte(gid))

全局存储的定义如下:

代码语言:javascript复制
type TransGlobalStore struct {
  dtmutil.ModelBase
  Gid              string              `json:"gid,omitempty"`
  TransType        string              `json:"trans_type,omitempty"`
  Steps            []map[string]string `json:"steps,omitempty" gorm:"-"`
  Payloads         []string            `json:"payloads,omitempty" gorm:"-"`
  BinPayloads      [][]byte            `json:"-" gorm:"-"`
  Status           string              `json:"status,omitempty"`
  QueryPrepared    string              `json:"query_prepared,omitempty"`
  Protocol         string              `json:"protocol,omitempty"`
  FinishTime       *time.Time          `json:"finish_time,omitempty"`
  RollbackTime     *time.Time          `json:"rollback_time,omitempty"`
  Result           string              `json:"result,omitempty"`
  RollbackReason   string              `json:"rollback_reason,omitempty"`
  Options          string              `json:"options,omitempty"`
  CustomData       string              `json:"custom_data,omitempty"`
  NextCronInterval int64               `json:"next_cron_interval,omitempty"`
  NextCronTime     *time.Time          `json:"next_cron_time,omitempty"`
  Owner            string              `json:"owner,omitempty"`
  Ext              TransGlobalExt      `json:"-" gorm:"-"`
  ExtData          string              `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options
  dtmcli.TransOptions
}

对于json_rpc也类似,就是通过gin的http路由,http协议的内容传输的是json数据,实现位于dtmsvr/api_json_rpc.go

代码语言:javascript复制
func addJrpcRouter(engine *gin.Engine) {
          engine.POST("/api/json-rpc", func(c *gin.Context) {
          return handlers[req.Method](req.Params)

真实的路由是定义在一个map里面的

代码语言:javascript复制
handlers := map[string]jrpcFunc{
    "newGid":         jrpcNewGid,
    "prepare":        jrpcPrepare,
    "submit":         jrpcSubmit,
    "abort":          jrpcAbort,
    "registerBranch": jrpcRegisterBranch,
  }

比如其中的获取全局事务id最终实现,和http协议是一样的

代码语言:javascript复制
func jrpcNewGid(interface{}) interface{} {
  return map[string]interface{}{"gid": GenGid()}
}

2, 然后我们看下grpc的实现dtmsvr/api_grpc.go

代码语言:javascript复制
func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
          r := svcAbort(TransFromDtmRequest(ctx, in))

中断事务的执行过程是通过全局事务id获取事务的元数据,修改全局事务的状态为中断,然后通过事务id获取所有的分支事务 ,最后处理分支事务:

代码语言:javascript复制
func svcAbort(t *TransGlobal) interface{} {
  dbt := GetTransGlobal(t.Gid)
  dbt.changeStatus(dtmcli.StatusAborting, withRollbackReason(t.RollbackReason))
  branches := GetStore().FindBranches(t.Gid)
  return dbt.Process(branches)

处理过程位于dtmsvr/trans_process.go

代码语言:javascript复制
func (t *TransGlobal) process(branches []TransBranch) error {
     rerr = t.getProcessor().ProcessOnce(branches)

每一种分布式事务模型的处理逻辑都不一样,以saga为例dtmsvr/trans_type_saga.go

代码语言:javascript复制
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

它修改了boltdb里面的状态,dtmsvr/storage/boltdb/boltdb.go

代码语言:javascript复制
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
     if finished {
       tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
      tPutGlobal(t, global)
     err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)

如果状态是完成状态,会删除相应的记录。

3,然后,我们看下第二部分,同步分支状态到存储,dtmsvr/svr.go

代码语言:javascript复制
k := updateBranch.gid   updateBranch.branchID   "-"   updateBranch.op
rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
  for { // flush branches every 200ms
     flushBranchs()
   }

对于存储过的状态不用再次操作,所以本地用了一个k来去重,它是通过全局事务id,分支id以及分支的处理动作名字来做的唯一键。以存储介质为mysql,动作为创建分支的操作来说,它的代码实现位于

dtmsvr/storage/sql/sql.go

代码语言:javascript复制
db := dbGet().Clauses(clause.OnConflict{
    OnConstraint: "gid_branch_uniq",
    DoUpdates:    clause.AssignmentColumns(updates),
  }).Create(branches)

4,第三部分加载topics到内存的操作流程如下: dtmsvr/cron.go

代码语言:javascript复制
cronUpdateTopicsMapOnce()

dtmsvr/topics.go

代码语言:javascript复制
func updateTopicsMap() {
          kvs := GetStore().FindKV(topicsCat, "")
          topicsMap[kv.K] = newTopic
代码语言:javascript复制
func topic2urls(topic string) []string {
          for k, subscriber := range topicsMap[topic].Subscribers {
    urls[k] = subscriber.URL

事务模型是一致性消息的时候dtmsvr/trans_type_msg.go

代码语言:javascript复制
func (t *transMsgProcessor) GenBranches() []TransBranch {
  for i, step := range t.Steps {
    urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)
    for j, url := range urls {
      b := TransBranch{

通过将消息中的url组装成分支事务信息的。

5,最后一步服务发现的注册的实现单独提供了一个包。

代码语言:javascript复制
github.com/dtm-labs/dtmdriver@v0.0.6/driver-mgr.go

0 人点赞