上一讲mac 上学习k8s系列(53)seata-go介绍了如何在本地部署seata,本文从源码角度分析下seata-go的具体实现:
首先看下at模式下的最基础的例子:sample/at/basic/main.go,它的main函数很简单
代码语言:javascript复制func main() {
client.Init()
initService()
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Timeout: time.Second * 30,
}, updateData)
<-make(chan struct{})
}
1,客户端的初始化,里面实现了Rm和Tm的初始化,目前是空的实现,使用者可以根据自己的需求来实现初始化。
代码语言:javascript复制client.Init()
initRmClient()
initTmClient()
其中initTmClient函数实现了调用了两个函数
代码语言:javascript复制initConfig()
initRemoting()
后者调用了
代码语言:javascript复制getty.InitRpcClient()
它调用了
代码语言:javascript复制rpcClient.init()
它利用github.com/apache/dubbo-getty 初始化了grpc链接,维持我们的tm和seata server之间的通信。
代码语言:javascript复制type Client interface {
EndPoint
}
代码语言:javascript复制addressList := getAvailServerList()
gettyClient := getty.NewTCPClient(
gxsync.NewTaskPoolSimple(0)
go gettyClient.RunEventLoop(c.newSession)
链接成功后通过协程维持链接心跳的刷新。
2,mysql链接的初始化
代码语言:javascript复制initService()
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")
其中
代码语言:javascript复制SeataATMySQLDriver = "seata-at-mysql"
在初始化的时候注册了at模式下的mysql driver,pkg/datasource/sql/driver.go
代码语言:javascript复制SeataATMySQLDriver = "seata-at-mysql"
SeataXAMySQLDriver = "seata-xa-mysql"
func init()
sql.Register(SeataATMySQLDriver, &seataATDriver{
seataDriver: &seataDriver{
transType: types.ATMode,
sql.Register(SeataXAMySQLDriver, &seataXADriver{
seataDriver: &seataDriver{
transType: types.XAMode,
type seataATDriver struct {
*seataDriver
}
这个driver实现了sql语句的解析,方便记录undo和redolog
代码语言:javascript复制func (d *seataATDriver) OpenConnector(name string) (c driver.Connector, err error)
connector, err := d.seataDriver.OpenConnector(name)
代码语言:javascript复制func (d *seataDriver) Open(name string) (driver.Conn, error) {
}
代码语言:javascript复制func (d *seataDriver) OpenConnector(name string) (c driver.Connector, err error) {
proxy, err := getOpenConnectorProxy(c, dbType, sql.OpenDB(c), name)
类似的还有XA driver的实现
代码语言:javascript复制type seataXADriver struct {
*seataDriver
}
func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err error)
type seataDriver struct {
transType types.TransactionType
target driver.Driver
}
3,注册全局事务id,执行我们的任务,并根据执行结果实现提交或者回滚。
代码语言:javascript复制tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Timeout: time.Second * 30,
}, updateData)
在全局事务的ctx执行我们的updateData任务
其中配置的默认配置定义在pkg/config/client_config.go
代码语言:javascript复制 func GetClientConfig() *ClientConfig {
GetDefaultGettyConfig(),
pkg/tm/transaction_executor.go中定义了我们最核心的函数:
代码语言:javascript复制func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) {
seataContextVariable = ContextParam("seataContextVariable")
if IsGlobalTx(ctx) {
ctx = transferTx(ctx)
SetXID(newCtx, GetXID(ctx))
variable.(*ContextVariable).Xid = xid
re = begin(ctx, gc)
defer func() {
var err error
// no need to do second phase if propagation is some type e.g. NotSupported.
if IsGlobalTx(ctx) {
// business maybe to throw panic, so need to recover it here.
if err = commitOrRollback(ctx, recover() == nil && re == nil); err != nil
}
}
re = business(ctx)
}
可以看到这就是核心逻辑:
A,在context中获取全局事务id,如果没有获取到,到searver上去获取
B,执行我们的本地事务
C,根据本地事务返回的成功失败,确定是回滚还是提交。
代码语言:javascript复制func begin(ctx context.Context, gc *GtxConfig) error {
return beginNewGtx(ctx, gc)
}
代码语言:javascript复制func beginNewGtx(ctx context.Context, gc *GtxConfig) error {
GetGlobalTransactionManager().Begin(ctx, timeout)
}
代码语言:javascript复制func (g *GlobalTransactionManager) Begin(ctx context.Context, timeout time.Duration) error {
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}
代码语言:javascript复制type GtxConfig struct {
Timeout time.Duration
Name string
Propagation Propagation
LockRetryInternal time.Duration
LockRetryTimes int16
}
提交事务或者回滚事务的流程如下,根据当前角色确认下一步该如何操作:
代码语言:javascript复制func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
case Launcher:
if tx := GetTx(ctx); isSuccess {
GetGlobalTransactionManager().Commit(ctx, tx);
re = GetGlobalTransactionManager().Rollback(ctx, tx);
case Participant:
case UnKnow:
提交
代码语言:javascript复制func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
req := message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req);
bf.Wait()
回滚
代码语言:javascript复制func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
req := message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err ==
bf.Wait()
其中全局事务的定义在pkg/tm/context.go
代码语言:javascript复制type GlobalTransaction struct {
Xid string
XidCopy string
TxName string
// TxStatus Identify a global transaction in a certain status
TxStatus message.GlobalStatus
// TxRole Roles in the transaction propagation behavior
TxRole GlobalTransactionRole
}
事务的相关状态定义pkg/tm/constant.go
代码语言:javascript复制type Propagation int8
Required
RequiresNew
NotSupported
Supports
Never
Mandatory
当前协程所担任的角色
代码语言:javascript复制const (
UnKnow = GlobalTransactionRole(0)
Launcher = GlobalTransactionRole(1)
Participant = GlobalTransactionRole(2)
)
全局事务的状态
代码语言:javascript复制GlobalStatus
GlobalStatusUnKnown
GlobalStatusBegin
GlobalStatusCommitting
GlobalStatusCommitRetrying
GlobalStatusRollbacking
GlobalStatusRollbackRetrying
GlobalStatusTimeoutRollbacking
GlobalStatusTimeoutRollbackRetrying
GlobalStatusAsyncCommitting
GlobalStatusCommitted
GlobalStatusCommitFailed
GlobalStatusRollbacked
GlobalStatusRollbackFailed
GlobalStatusTimeoutRollbacked
GlobalStatusTimeoutRollbackFailed
GlobalStatusFinished
可以看到at模式其实本质上是通过一个全局锁在事务分支的分支锁实现事务的隔离的,下一讲我们通过源码分析下tcc模式。