我们继续上一篇golang源码分析:dtm分布式事务(3)分析api服务的源码,位置位于dtmsvr/svr.go:
代码语言:javascript复制 func StartSvr() *gin.Engine {
dtmcli.GetRestyClient().SetTimeout
app := dtmutil.GetGinApp()
app = httpMetrics(app)
addRoute(app)
addJrpcRouter(app)
go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
dtmgpb.RegisterDtmServer(s, &dtmServer{})
go func() {
err := s.Serve(lis)
for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i {
go updateBranchAsync()
updateTopicsMap()
go CronUpdateTopicsMap()
err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)
首先启动一个gin服务器,然后注册一个grpcserver :dtmServer,然后通过一个协程启动grpc服务,然后启动协程,每200ms一次将分支信息同步到存储。然后检查下kv存储里"topics"的值的版本,存储到topicsMap,紧接着启动一个协程任务在后台执行上面的kv存储到内存的更新。最后把我们的server注册到服务发现,一般是通过环境变量控制的
代码语言:javascript复制# Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
# EndPoint: 'localhost:36790'
总结下就5件事
1,启动http服务
2,启动grpc服务
3,将分支的更新同步到存储
4,将kc里面存储的topics数据同步到内存。
5,将服务注册到服务发现注册中心。
其中,启动http服务包括三部分
A,为监控注册路由
B,为http服务注册路由
C,为json-rpc注册路由
1, 其中http路由就是简单的gin路由注册,位于dtmsvr/api_http.go
代码语言:javascript复制func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))
提供了常见申请全局事务ID,prepare,abort,commit等事务执行动作。
代码语言:javascript复制func prepare(c *gin.Context) interface{} {
return svcPrepare(TransFromContext(c))
}
其中prepare需要获取全局事务的分支事务
代码语言:javascript复制dbt := GetTransGlobal(t.Gid)
代码语言:javascript复制trans := GetStore().FindTransGlobalStore(gid)
默认这些信息存储在boltdb里面dtmsvr/storage/boltdb/boltdb.go,可以看下获取全局存储的过程
代码语言:javascript复制func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
trans = tGetGlobal(t, gid)
bs := t.Bucket(bucketGlobal).Get([]byte(gid))
全局存储的定义如下:
代码语言:javascript复制type TransGlobalStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`
TransType string `json:"trans_type,omitempty"`
Steps []map[string]string `json:"steps,omitempty" gorm:"-"`
Payloads []string `json:"payloads,omitempty" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status,omitempty"`
QueryPrepared string `json:"query_prepared,omitempty"`
Protocol string `json:"protocol,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Result string `json:"result,omitempty"`
RollbackReason string `json:"rollback_reason,omitempty"`
Options string `json:"options,omitempty"`
CustomData string `json:"custom_data,omitempty"`
NextCronInterval int64 `json:"next_cron_interval,omitempty"`
NextCronTime *time.Time `json:"next_cron_time,omitempty"`
Owner string `json:"owner,omitempty"`
Ext TransGlobalExt `json:"-" gorm:"-"`
ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options
dtmcli.TransOptions
}
对于json_rpc也类似,就是通过gin的http路由,http协议的内容传输的是json数据,实现位于dtmsvr/api_json_rpc.go
代码语言:javascript复制func addJrpcRouter(engine *gin.Engine) {
engine.POST("/api/json-rpc", func(c *gin.Context) {
return handlers[req.Method](req.Params)
真实的路由是定义在一个map里面的
代码语言:javascript复制handlers := map[string]jrpcFunc{
"newGid": jrpcNewGid,
"prepare": jrpcPrepare,
"submit": jrpcSubmit,
"abort": jrpcAbort,
"registerBranch": jrpcRegisterBranch,
}
比如其中的获取全局事务id最终实现,和http协议是一样的
代码语言:javascript复制func jrpcNewGid(interface{}) interface{} {
return map[string]interface{}{"gid": GenGid()}
}
2, 然后我们看下grpc的实现dtmsvr/api_grpc.go
代码语言:javascript复制func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r := svcAbort(TransFromDtmRequest(ctx, in))
中断事务的执行过程是通过全局事务id获取事务的元数据,修改全局事务的状态为中断,然后通过事务id获取所有的分支事务 ,最后处理分支事务:
代码语言:javascript复制func svcAbort(t *TransGlobal) interface{} {
dbt := GetTransGlobal(t.Gid)
dbt.changeStatus(dtmcli.StatusAborting, withRollbackReason(t.RollbackReason))
branches := GetStore().FindBranches(t.Gid)
return dbt.Process(branches)
处理过程位于dtmsvr/trans_process.go
代码语言:javascript复制func (t *TransGlobal) process(branches []TransBranch) error {
rerr = t.getProcessor().ProcessOnce(branches)
每一种分布式事务模型的处理逻辑都不一样,以saga为例dtmsvr/trans_type_saga.go
代码语言:javascript复制func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
它修改了boltdb里面的状态,dtmsvr/storage/boltdb/boltdb.go
代码语言:javascript复制func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
if finished {
tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
tPutGlobal(t, global)
err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)
如果状态是完成状态,会删除相应的记录。
3,然后,我们看下第二部分,同步分支状态到存储,dtmsvr/svr.go
代码语言:javascript复制k := updateBranch.gid updateBranch.branchID "-" updateBranch.op
rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
for { // flush branches every 200ms
flushBranchs()
}
对于存储过的状态不用再次操作,所以本地用了一个k来去重,它是通过全局事务id,分支id以及分支的处理动作名字来做的唯一键。以存储介质为mysql,动作为创建分支的操作来说,它的代码实现位于
dtmsvr/storage/sql/sql.go
代码语言:javascript复制db := dbGet().Clauses(clause.OnConflict{
OnConstraint: "gid_branch_uniq",
DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches)
4,第三部分加载topics到内存的操作流程如下: dtmsvr/cron.go
代码语言:javascript复制cronUpdateTopicsMapOnce()
dtmsvr/topics.go
代码语言:javascript复制func updateTopicsMap() {
kvs := GetStore().FindKV(topicsCat, "")
topicsMap[kv.K] = newTopic
代码语言:javascript复制func topic2urls(topic string) []string {
for k, subscriber := range topicsMap[topic].Subscribers {
urls[k] = subscriber.URL
事务模型是一致性消息的时候dtmsvr/trans_type_msg.go
代码语言:javascript复制func (t *transMsgProcessor) GenBranches() []TransBranch {
for i, step := range t.Steps {
urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)
for j, url := range urls {
b := TransBranch{
通过将消息中的url组装成分支事务信息的。
5,最后一步服务发现的注册的实现单独提供了一个包。
代码语言:javascript复制github.com/dtm-labs/dtmdriver@v0.0.6/driver-mgr.go