在MySQL 5.7.7版本中,才将xa的bug修复,符合Open Group 的<<Distributed Transaction Processing:The XA Specification>> 标准。Mysql中存在两种XA事务,一种是内部XA事务主要用来协调存储引擎和二进制日志,一种是外部事务可以参与到外部分布式事务中(比如多个数据库实现的分布式事务)。xa的语法如下:
代码语言:javascript复制XA {START|BEGIN} xid [JOIN|RESUME] //开启本地事务
XA END xid //结束本地事务
XA PREPARE xid //全局事务进入预备状态
XA COMMIT xid[ONE PHASE] //提交
XA ROLLBACK xid //回滚
XA RECOVER[CONVERT XID ] //恢复没有提交的事务,继续执行
XA是牺牲可用性保证强一致性的事务,因为需要mysql的事务隔离级别为串行化。下面我们来实践一把。首先启动两个mysql实例,端口分别是3306和3307:
代码语言:javascript复制version: "3.1"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
- MYSQL_ROOT_PASSWORD=12345678
command: --default-authentication-plugin=mysql_native_password --default-time-zone=' 08:00'
volumes:
- /learn/mysql:/docker-entrypoint-initdb.d
- /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
ports:
- "3306:3306"
extra_hosts:
- host.docker.internal:host-gateway
mysql2:
image: mysql:5.7
container_name: mysql2
environment:
- MYSQL_ROOT_PASSWORD=12345678
command: --default-authentication-plugin=mysql_native_password --default-time-zone=' 08:00'
volumes:
- /learn/mysql:/docker-entrypoint-initdb.d
- /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
ports:
- "3307:3306"
extra_hosts:
- host.docker.internal:host-gateway
分别链接mysql,确认是否支持xa协议,修改事务隔离级别
代码语言:javascript复制 % mysql -uroot -p12345678 -h127.0.0.1
% mysql -uroot -p12345678 -h127.0.0.1 -P3307
mysql> show variables like 'innodb_support_xa';
------------------- -------
| Variable_name | Value |
------------------- -------
| innodb_support_xa | ON |
------------------- -------
1 row in set (0.01 sec)
mysql> set global tx_isolation=serializable;
Query OK, 0 rows affected, 1 warning (0.01 sec)
mysql> show variables like '%tx_isolation%';
--------------- --------------
| Variable_name | Value |
--------------- --------------
| tx_isolation | SERIALIZABLE |
--------------- --------------
1 row in set (0.00 sec)
然后创建数据库和表
mysql3306中 我们有一个user表
代码语言:javascript复制 create database orders;
use orders;
create table user (
id int,
name varchar(10),
score int
);
insert into user values(1, "foo", 10);
在mysql3307中,我们有一个wallet表。
代码语言:javascript复制use orders;
create table wallet (
id int,
money float
);
insert into wallet values(1, 10.1);
然后就可以通过xa协议实现分布式事务
代码语言:javascript复制package main
import (
"database/sql"
"fmt"
"log"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的连接
db1, err := sql.Open("mysql", "root:12345678@tcp(127.0.0.1:3306)/orders")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的连接
db2, err := sql.Open("mysql", "root:12345678@tcp(127.0.0.1:3307)/orders")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 开始前显示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" xid " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("% vn", err)
fmt.Println("=== call rollback ====")
// db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
// db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 启动
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score 2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money 1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.WithStack(err))
/*
xa recover;
Empty set (0.01 sec)
*/
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db1 prepare error"))
/*
db1
xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287200 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
db2
xa recover;
Empty set (0.00 sec)
//新事务会死锁
=== xid:1671287283 ====
=== call start ====
Error 1205: Lock wait timeout exceeded; try restarting transaction
main.main
mysql> xa commit '1671287200';
Query OK, 0 rows affected (0.00 sec)
*/
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
/*
db1
mysql> xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287434 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
db2
xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287434 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
*/
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
// TODO: 尝试重新提交COMMIT
// TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交
log.Println("xid:" xid)
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
log.Println("xid:" xid)
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
为了实验充分理解分布式事务,我们分别在几个特殊阶段点来panic,中断事务。
1,在两个db实例prepare之前
2,在db1 prepare之后,db2 prepare之前
3,在两个db prepare之后,db1 commit之前
4,在db1 commit之后, db2 commit之前
情形1下:两个事务都没有prepare,全局不可见,异常中断后,本地事务回滚掉了,在两个实例上恢复的时候,都是一致的
代码语言:javascript复制 xa recover;
Empty set (0.00 sec)
情形2下:db1,已经prepare了,全局事务会记录下来,本地事务不会自动回滚掉,db2 没有prepare,本地事务会自动回滚掉
代码语言:javascript复制db1
xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287200 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
db2
xa recover;
Empty set (0.00 sec)
这个时候如果新开一个事务,会等待上一个没有结束的事务释放锁,而超时
代码语言:javascript复制 === xid:1671287283 ====
=== call start ====
Error 1205: Lock wait timeout exceeded; try restarting transaction
main.main
情形3下:两个事务都处于prepare状态,等待处理
代码语言:javascript复制 db1
mysql> xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287434 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
db2
xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671287434 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
情形4下:db1已经提交,recover后是空,db2是待提交状态。
代码语言:javascript复制db1
mysql> xa recover;
Empty set (0.00 sec)
db2
xa recover;
---------- -------------- -------------- ------------
| formatID | gtrid_length | bqual_length | data |
---------- -------------- -------------- ------------
| 1 | 10 | 0 | 1671289143 |
---------- -------------- -------------- ------------
1 row in set (0.00 sec)
这个时候可以操作db2的回滚,或者提交,db1已经提交,没法回滚了
代码语言:javascript复制db1
xa rollback '1671289143';
ERROR 1397 (XAE04): XAER_NOTA: Unknown XID
db2
xa rollback '1671289143';
Query OK, 0 rows affected (0.01 sec)
所以在xa下也会出现不一致,需要人工介入进行回滚或者提交,保证最终的一致性。
总结下:没有银弹,xa虽然尽最大努力保证了一致性,但是如果出现部分提交,还是需要人工介入处理,保证最终的一致性。