golang源码分析:seata-go (1)at模式

2022-12-17 16:35:09 浏览数 (1)

上一讲mac 上学习k8s系列(53)seata-go介绍了如何在本地部署seata,本文从源码角度分析下seata-go的具体实现:

首先看下at模式下的最基础的例子:sample/at/basic/main.go,它的main函数很简单

代码语言:javascript复制
func main() {
  client.Init()
  initService()
  tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
    Name:    "ATSampleLocalGlobalTx",
    Timeout: time.Second * 30,
  }, updateData)
  <-make(chan struct{})
}

1,客户端的初始化,里面实现了Rm和Tm的初始化,目前是空的实现,使用者可以根据自己的需求来实现初始化。

代码语言:javascript复制
client.Init()
  initRmClient()
  initTmClient()

其中initTmClient函数实现了调用了两个函数

代码语言:javascript复制
initConfig()
initRemoting()

后者调用了

代码语言:javascript复制
getty.InitRpcClient()

它调用了

代码语言:javascript复制
rpcClient.init()

它利用github.com/apache/dubbo-getty 初始化了grpc链接,维持我们的tm和seata server之间的通信。

代码语言:javascript复制
type Client interface {
  EndPoint
}
代码语言:javascript复制
addressList := getAvailServerList()
gettyClient := getty.NewTCPClient(
  gxsync.NewTaskPoolSimple(0)
go gettyClient.RunEventLoop(c.newSession)

链接成功后通过协程维持链接心跳的刷新。

2,mysql链接的初始化

代码语言:javascript复制
initService()
  db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")

其中

代码语言:javascript复制
SeataATMySQLDriver = "seata-at-mysql"

在初始化的时候注册了at模式下的mysql driver,pkg/datasource/sql/driver.go

代码语言:javascript复制
SeataATMySQLDriver = "seata-at-mysql"
SeataXAMySQLDriver = "seata-xa-mysql"

func init() 
     sql.Register(SeataATMySQLDriver, &seataATDriver{
    seataDriver: &seataDriver{
      transType: types.ATMode,
        sql.Register(SeataXAMySQLDriver, &seataXADriver{
    seataDriver: &seataDriver{
      transType: types.XAMode,
    type seataATDriver struct {
  *seataDriver
}

这个driver实现了sql语句的解析,方便记录undo和redolog

代码语言:javascript复制
func (d *seataATDriver) OpenConnector(name string) (c driver.Connector, err error) 
        connector, err := d.seataDriver.OpenConnector(name)
代码语言:javascript复制
func (d *seataDriver) Open(name string) (driver.Conn, error) {
}
代码语言:javascript复制
func (d *seataDriver) OpenConnector(name string) (c driver.Connector, err error) {
        proxy, err := getOpenConnectorProxy(c, dbType, sql.OpenDB(c), name)

类似的还有XA driver的实现

代码语言:javascript复制
type seataXADriver struct {
  *seataDriver
}

func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err error) 
    type seataDriver struct {
  transType types.TransactionType
  target    driver.Driver
}

3,注册全局事务id,执行我们的任务,并根据执行结果实现提交或者回滚。

代码语言:javascript复制
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
    Name:    "ATSampleLocalGlobalTx",
    Timeout: time.Second * 30,
  }, updateData)

在全局事务的ctx执行我们的updateData任务

其中配置的默认配置定义在pkg/config/client_config.go

代码语言:javascript复制
  func GetClientConfig() *ClientConfig {
        GetDefaultGettyConfig(),

pkg/tm/transaction_executor.go中定义了我们最核心的函数:

代码语言:javascript复制
func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) {
      seataContextVariable = ContextParam("seataContextVariable")
        if IsGlobalTx(ctx) {
             ctx = transferTx(ctx)
             SetXID(newCtx, GetXID(ctx))
             variable.(*ContextVariable).Xid = xid
       
       re = begin(ctx, gc)
       
    defer func() {
      var err error
       // no need to do second phase if propagation is some type e.g. NotSupported.
       if IsGlobalTx(ctx) {
         // business maybe to throw panic, so need to recover it here.
          if err = commitOrRollback(ctx, recover() == nil && re == nil); err != nil 
          }
     }
  re = business(ctx)
 }

可以看到这就是核心逻辑:

A,在context中获取全局事务id,如果没有获取到,到searver上去获取

B,执行我们的本地事务

C,根据本地事务返回的成功失败,确定是回滚还是提交。

代码语言:javascript复制
func begin(ctx context.Context, gc *GtxConfig) error {
    return beginNewGtx(ctx, gc)
}
代码语言:javascript复制
func beginNewGtx(ctx context.Context, gc *GtxConfig) error {
  GetGlobalTransactionManager().Begin(ctx, timeout)
}
代码语言:javascript复制
func (g *GlobalTransactionManager) Begin(ctx context.Context, timeout time.Duration) error {
              res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
      return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}
代码语言:javascript复制
type GtxConfig struct {
  Timeout           time.Duration
  Name              string
  Propagation       Propagation
  LockRetryInternal time.Duration
  LockRetryTimes    int16
}

提交事务或者回滚事务的流程如下,根据当前角色确认下一步该如何操作:

代码语言:javascript复制
func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
      case Launcher:
       if tx := GetTx(ctx); isSuccess {
        GetGlobalTransactionManager().Commit(ctx, tx);
        re = GetGlobalTransactionManager().Rollback(ctx, tx);
      case Participant:
      case UnKnow:

提交

代码语言:javascript复制
func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
        req := message.GlobalCommitRequest{
    AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
  }
   for bf.Ongoing() {
    if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req);
      bf.Wait()

回滚

代码语言:javascript复制
func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
        req := message.GlobalRollbackRequest{
    AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
  }
  for bf.Ongoing() {
    if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == 
      bf.Wait()

其中全局事务的定义在pkg/tm/context.go

代码语言:javascript复制
type GlobalTransaction struct {
  Xid     string
  XidCopy string
  TxName  string
  // TxStatus Identify a global transaction in a certain status
  TxStatus message.GlobalStatus
  // TxRole Roles in the transaction propagation behavior
  TxRole GlobalTransactionRole
}

事务的相关状态定义pkg/tm/constant.go

代码语言:javascript复制
type Propagation int8
      Required
      RequiresNew
      NotSupported
      Supports
      Never
      Mandatory

当前协程所担任的角色

代码语言:javascript复制
const (
  UnKnow      = GlobalTransactionRole(0)
  Launcher    = GlobalTransactionRole(1)
  Participant = GlobalTransactionRole(2)
)

全局事务的状态

代码语言:javascript复制
GlobalStatus
      GlobalStatusUnKnown
      GlobalStatusBegin
      GlobalStatusCommitting
      GlobalStatusCommitRetrying
      GlobalStatusRollbacking
      GlobalStatusRollbackRetrying
      GlobalStatusTimeoutRollbacking
      GlobalStatusTimeoutRollbackRetrying
      GlobalStatusAsyncCommitting
      GlobalStatusCommitted
      GlobalStatusCommitFailed
      GlobalStatusRollbacked
      GlobalStatusRollbackFailed
      GlobalStatusTimeoutRollbacked
      GlobalStatusTimeoutRollbackFailed
      GlobalStatusFinished

可以看到at模式其实本质上是通过一个全局锁在事务分支的分支锁实现事务的隔离的,下一讲我们通过源码分析下tcc模式。

0 人点赞