golang源码分析:dtm分布式事务(3)

2023-03-01 16:16:22 浏览数 (3)

在简单介绍完使用的例子后golang源码分析:dtm分布式事务(1)golang源码分析:dtm分布式事务(2),我们分析下服务端的源码实现。服务端的源码非常简洁,提供了grpc,http和rpc-json三种方式的接口。下面我们从main函数来开始分析。

在main.go的main函数里面只做了两件事,分别是启动dtm 服务端,以及dtm的控制台。

代码语言:javascript复制
  app, conf := entry.Main(&Version)
  addAdmin(app, conf)

首先我们看下控制台的代码实现

代码语言:javascript复制
func addAdmin(app *gin.Engine, conf *config.Type) {
      dist := getSub(admin, "admin", "dist")
      index, err := dist.Open("index.html")
      cont, err := ioutil.ReadAll(index)
      sfile := string(cont)
      renderIndex := func(c *gin.Context) {
        c.Header("content-type", "text/html;charset=utf-8")
        c.String(200, sfile)
      }
      app.GET("/", renderIndex)
      app.StaticFS("/assets", http.FS(getSub(dist, "assets")))
      app.GET("/admin/*name", renderIndex)
      app.GET("/assets/*name", proxyAdmin)
      app.GET("/admin/*name", proxyAdmin)

可以看到,其实是启动了一个文件服务器,然后把根路径路由到index.html,提供相应的素材文件,如果找不到index.html文件,则通过代理路由到admin.dtm.pub官方实例。 后台是基于vue实现的,路径在admin目录下面。

dtm服务端代码入口在文件 dtmsvr/entry/main.go里

代码语言:javascript复制
func Main(version *string) (*gin.Engine, *config.Type) {
    config.MustLoadConfig(*confFile)
    registry.WaitStoreUp()
    app := dtmsvr.StartSvr()  
    go dtmsvr.CronExpiredTrans(-1)

加载配置,等待服务器的存储ready,启动api server和cron job,其中等待服务器ready是通过ping接口等待服务器响应。

代码语言:javascript复制
 err := GetStore().Ping();

cron job实现比较粗暴,启动一个协程,在内部的死循环里,交替执行事务和sleep

代码语言:javascript复制
gid := CronTransOnce()
sleepCronTime()

回过头来,我们看看配置是怎么加载的,配置我们可以复制模板进行自定义,模板位于conf.sample.yml。即使没有定义配置文件也能正常运行,这是如何做到的呢?其实在定义config的结构体的时候定义了默认值,然后在初始化对象的时候,通过反射取结构体的tag里面的默认值。具体代码实现位于dtmsvr/config/config_utils.go:

代码语言:javascript复制
func loadFromEnvInner(prefix string, conf reflect.Value, defaultValue string) {

首先我们看下Config的定义dtmsvr/config/config.go

代码语言:javascript复制
var Config = Type{}  
type Type struct {
   TimeoutToFail   int64  `yaml:"TimeoutToFail" default:"35"`

可以看到他定义了一个特殊的tag default,同理我们Store也是这样:

代码语言:javascript复制
type Store struct {
   Driver   string `yaml:"Driver" default:"boltdb"`

通过配置我们知道,默认情况下服务端数据是存储在BoltDb里面,boltdb是golang实现的一个b 树内存kv存储。etcd和consul底层的存储都是它,etcd还进行了个性化定制,dtm使用的就是这个版本

代码语言:javascript复制
bolt "go.etcd.io/bbolt"

除了boltdb,存储还支持redis,mysql,postgres,我们以基于本地内存的bilt来进行分析。它的具体代码位于dtmsvr/storage/boltdb/boltdb.go:

代码语言:javascript复制
var bucketGlobal = []byte("global")
var bucketBranches = []byte("branches")
var bucketIndex = []byte("index")
var bucketKV = []byte("kv")

然后通过工厂管理各种具体的存储dtmsvr/storage/registry/registry.go

代码语言:javascript复制
var storeFactorys = map[string]StorageFactory{
  "boltdb": &SingletonFactory{

使用的时候通过GetStore获取具体的存储:

代码语言:javascript复制
func GetStore() storage.Store {
return storeFactorys[conf.Store.Driver].GetStorage()
}

dtmsvr/utils.go

代码语言:javascript复制
func GetStore() storage.Store {
  return registry.GetStore()
}

server部分,我们先留到下一讲进行分析,我们先看下cron job的实现

dtmsvr/cron.go

代码语言:javascript复制
func CronTransOnce() (gid string) {
    trans := lockOneTrans(CronForwardDuration)
    gid = trans.Gid
    branches := GetStore().FindBranches(gid)
    err := trans.Process(branches)

通过全局的事务id,查找到对应的所有分支事务,然后调用Process来进行处理。dtmsvr/trans_process.go

代码语言:javascript复制
func (t *TransGlobal) Process(branches []TransBranch) error {
    r := t.process(branches)
    transactionMetrics(t, r == nil)

处理完毕后需要上报处理结果便于统一监控。

代码语言:javascript复制
func (t *TransGlobal) process(branches []TransBranch) error {
   go func() {
      err := t.processInner(branches)
  err := t.processInner(branches)

处理的时候如果不需要同步获取结果,就起一个协程来处理分支,否则同步处理。

代码语言:javascript复制
func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) {
    rerr = t.getProcessor().ProcessOnce(branches)

通过事务的类型tcc、saga等获取对应的处理器来处理

代码语言:javascript复制
func (t *TransGlobal) getProcessor() transProcessor {
  return processorFac[t.TransType](t)
}

处理器的注册位于dtmsvr/trans_class.go

代码语言:javascript复制
var processorFac = map[string]processorCreator{}

func registorProcessorCreator(transType string, creator processorCreator) {
  processorFac[transType] = creator
}

每一种事务处理模式会在初始化的时候注册处理器

dtmsvr/trans_type_msg.go

代码语言:javascript复制
func init() {
  registorProcessorCreator("msg", func(trans *TransGlobal) transProcessor { return &transMsgProcessor{TransGlobal: trans} })
}

它对应的接口

代码语言:javascript复制
func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {

会处理相应的事务。dtmsvr/trans_type_saga.go,dtmsvr/trans_type_tcc.go,dtmsvr/trans_type_workflow.go,dtmsvr/trans_type_xa.go的实现是类似的。

1 人点赞