golang源码分析:golang使用mysql XA事务

2023-03-01 16:15:15 浏览数 (1)

在MySQL 5.7.7版本中,才将xa的bug修复,符合Open Group 的<<Distributed Transaction Processing:The XA Specification>> 标准。Mysql中存在两种XA事务,一种是内部XA事务主要用来协调存储引擎和二进制日志,一种是外部事务可以参与到外部分布式事务中(比如多个数据库实现的分布式事务)。xa的语法如下:

代码语言:javascript复制
XA {START|BEGIN} xid [JOIN|RESUME] //开启本地事务
XA END xid //结束本地事务
XA PREPARE xid  //全局事务进入预备状态
XA COMMIT xid[ONE PHASE] //提交
XA ROLLBACK xid  //回滚
XA RECOVER[CONVERT XID ] //恢复没有提交的事务,继续执行

XA是牺牲可用性保证强一致性的事务,因为需要mysql的事务隔离级别为串行化。下面我们来实践一把。首先启动两个mysql实例,端口分别是3306和3307:

代码语言:javascript复制
version: "3.1"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=12345678
    command: --default-authentication-plugin=mysql_native_password --default-time-zone=' 08:00'
    volumes:
      - /learn/mysql:/docker-entrypoint-initdb.d
      - /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
    ports:
      - "3306:3306"
    extra_hosts:
     - host.docker.internal:host-gateway
  mysql2:
    image: mysql:5.7
    container_name: mysql2
    environment:
      - MYSQL_ROOT_PASSWORD=12345678
    command: --default-authentication-plugin=mysql_native_password --default-time-zone=' 08:00'
    volumes:
      - /learn/mysql:/docker-entrypoint-initdb.d
      - /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
    ports:
      - "3307:3306"
    extra_hosts:
     - host.docker.internal:host-gateway

分别链接mysql,确认是否支持xa协议,修改事务隔离级别

代码语言:javascript复制
 % mysql -uroot -p12345678 -h127.0.0.1
 % mysql -uroot -p12345678 -h127.0.0.1 -P3307
 
mysql> show variables like 'innodb_support_xa';
 ------------------- ------- 
| Variable_name     | Value |
 ------------------- ------- 
| innodb_support_xa | ON    |
 ------------------- ------- 
1 row in set (0.01 sec)
mysql> set global tx_isolation=serializable;
Query OK, 0 rows affected, 1 warning (0.01 sec)
mysql> show variables like '%tx_isolation%';
 --------------- -------------- 
| Variable_name | Value        |
 --------------- -------------- 
| tx_isolation  | SERIALIZABLE |
 --------------- -------------- 
1 row in set (0.00 sec)

然后创建数据库和表

mysql3306中 我们有一个user表

代码语言:javascript复制
 create database orders;
 use orders;
create table user (
    id int,
    name varchar(10),
    score int
);
insert into user values(1, "foo", 10);

在mysql3307中,我们有一个wallet表。

代码语言:javascript复制
use orders;
create table wallet (
    id int,
    money float 
);
insert into wallet values(1, 10.1);

然后就可以通过xa协议实现分布式事务

代码语言:javascript复制
package main

import (
  "database/sql"
  "fmt"
  "log"
  "strconv"
  "time"

  _ "github.com/go-sql-driver/mysql"
  "github.com/pkg/errors"
)

func main() {
  var err error
  // db1的连接
  db1, err := sql.Open("mysql", "root:12345678@tcp(127.0.0.1:3306)/orders")
  if err != nil {
    panic(err.Error())
  }
  defer db1.Close()
  // db2的连接
  db2, err := sql.Open("mysql", "root:12345678@tcp(127.0.0.1:3307)/orders")
  if err != nil {
    panic(err.Error())
  }
  defer db2.Close()
  // 开始前显示
  var score int
  db1.QueryRow("select score from user where id = 1").Scan(&score)
  fmt.Println("user1 score:", score)
  var money float64
  db2.QueryRow("select money from wallet where id = 1").Scan(&money)
  fmt.Println("wallet1 money:", money)
  // 生成xid
  xid := strconv.FormatInt(time.Now().Unix(), 10)
  fmt.Println("=== xid:"   xid   " ====")
  defer func() {
    if err := recover(); err != nil {
      fmt.Printf("% vn", err)
      fmt.Println("=== call rollback ====")
      // db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
      // db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
    }
    db1.QueryRow("select score from user where id = 1").Scan(&score)
    fmt.Println("user1 score:", score)
    db2.QueryRow("select money from wallet where id = 1").Scan(&money)
    fmt.Println("wallet1 money:", money)
  }()
  // XA 启动
  fmt.Println("=== call start ====")
  if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  // DML操作
  if _, err = db1.Exec("update user set score=score 2 where id =1"); err != nil {
    panic(errors.WithStack(err))
  }
  if _, err = db2.Exec("update wallet set money=money 1.2 where id=1"); err != nil {
    panic(errors.WithStack(err))
  }
  // panic(errors.WithStack(err)) 
  /*
   xa recover;
   Empty set (0.01 sec)
  */
  // XA end
  fmt.Println("=== call end ====")
  if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  // prepare
  fmt.Println("=== call prepare ====")
  if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  // panic(errors.New("db1 prepare error"))
  /*
    db1
    xa recover;
     ---------- -------------- -------------- ------------ 
    | formatID | gtrid_length | bqual_length | data       |
     ---------- -------------- -------------- ------------ 
    |        1 |           10 |            0 | 1671287200 |
     ---------- -------------- -------------- ------------ 
    1 row in set (0.00 sec)
    db2
    xa recover;
    Empty set (0.00 sec)
    //新事务会死锁
    === xid:1671287283 ====
    === call start ====
    Error 1205: Lock wait timeout exceeded; try restarting transaction
    main.main

    mysql> xa commit '1671287200';
    Query OK, 0 rows affected (0.00 sec)
  */
  if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
    panic(errors.WithStack(err))
  }
  // panic(errors.New("db2 prepare error"))
  /*
      db1
      mysql> xa recover;
     ---------- -------------- -------------- ------------ 
    | formatID | gtrid_length | bqual_length | data       |
     ---------- -------------- -------------- ------------ 
    |        1 |           10 |            0 | 1671287434 |
     ---------- -------------- -------------- ------------ 
    1 row in set (0.00 sec)
        db2
      xa recover;
     ---------- -------------- -------------- ------------ 
    | formatID | gtrid_length | bqual_length | data       |
     ---------- -------------- -------------- ------------ 
    |        1 |           10 |            0 | 1671287434 |
     ---------- -------------- -------------- ------------ 
    1 row in set (0.00 sec)
  */
  // commit
  fmt.Println("=== call commit ====")
  if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
    // TODO: 尝试重新提交COMMIT
    // TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交
    log.Println("xid:"   xid)
  }
  // panic(errors.New("db2 commit error"))
  if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
    log.Println("xid:"   xid)
  }
  db1.QueryRow("select score from user where id = 1").Scan(&score)
  fmt.Println("user1 score:", score)
  db2.QueryRow("select money from wallet where id = 1").Scan(&money)
  fmt.Println("wallet1 money:", money)
}

为了实验充分理解分布式事务,我们分别在几个特殊阶段点来panic,中断事务。

1,在两个db实例prepare之前

2,在db1 prepare之后,db2 prepare之前

3,在两个db prepare之后,db1 commit之前

4,在db1 commit之后, db2 commit之前

情形1下:两个事务都没有prepare,全局不可见,异常中断后,本地事务回滚掉了,在两个实例上恢复的时候,都是一致的

代码语言:javascript复制
 xa recover;
Empty set (0.00 sec)

情形2下:db1,已经prepare了,全局事务会记录下来,本地事务不会自动回滚掉,db2 没有prepare,本地事务会自动回滚掉

代码语言:javascript复制
db1
xa recover;
 ---------- -------------- -------------- ------------ 
| formatID | gtrid_length | bqual_length | data       |
 ---------- -------------- -------------- ------------ 
|        1 |           10 |            0 | 1671287200 |
 ---------- -------------- -------------- ------------ 
1 row in set (0.00 sec)
db2
xa recover;
Empty set (0.00 sec)

这个时候如果新开一个事务,会等待上一个没有结束的事务释放锁,而超时

代码语言:javascript复制
    === xid:1671287283 ====
    === call start ====
    Error 1205: Lock wait timeout exceeded; try restarting transaction
    main.main

情形3下:两个事务都处于prepare状态,等待处理

代码语言:javascript复制
  db1
      mysql> xa recover;
     ---------- -------------- -------------- ------------ 
    | formatID | gtrid_length | bqual_length | data       |
     ---------- -------------- -------------- ------------ 
    |        1 |           10 |            0 | 1671287434 |
     ---------- -------------- -------------- ------------ 
    1 row in set (0.00 sec)
  db2
      xa recover;
     ---------- -------------- -------------- ------------ 
    | formatID | gtrid_length | bqual_length | data       |
     ---------- -------------- -------------- ------------ 
    |        1 |           10 |            0 | 1671287434 |
     ---------- -------------- -------------- ------------ 
    1 row in set (0.00 sec)

情形4下:db1已经提交,recover后是空,db2是待提交状态。

代码语言:javascript复制
db1
      mysql> xa recover;
      Empty set (0.00 sec)
db2
      xa recover;
       ---------- -------------- -------------- ------------ 
      | formatID | gtrid_length | bqual_length | data       |
       ---------- -------------- -------------- ------------ 
      |        1 |           10 |            0 | 1671289143 |
       ---------- -------------- -------------- ------------ 
      1 row in set (0.00 sec)

这个时候可以操作db2的回滚,或者提交,db1已经提交,没法回滚了

代码语言:javascript复制
db1
xa rollback '1671289143';
ERROR 1397 (XAE04): XAER_NOTA: Unknown XID
db2
 xa rollback '1671289143';
Query OK, 0 rows affected (0.01 sec)

所以在xa下也会出现不一致,需要人工介入进行回滚或者提交,保证最终的一致性。

总结下:没有银弹,xa虽然尽最大努力保证了一致性,但是如果出现部分提交,还是需要人工介入处理,保证最终的一致性。

0 人点赞