一、Seata的实现思路
如果要你实现一个简单的Seata的AT模式,你会如何实现?
1.需要对数据源扩展,为什么?
因为数据源是事务提交和回滚的关键,只有对它进行扩展,才有后面的增强。 2.如何增强?
一个简单的做法,也是Spring常用的手法,采用Aop,进行前后逻辑的增强操作,同时进行手动提交事务。
3.增强的过程中,如何对数据进行回滚和提交的操作?
因为我们的做法需要在事务提交或者回滚前,实现对各个分支事务的提交或者回滚,因此不可避免,这个操作,必须可逆,因此需要保留好事务提交或者回滚前的日志操作,类似mysql的undo/redo log惯用的手法。
4.如何执行统一的提交和回滚?
进行提交情况的上报,进行最终的提交和回滚,完成上面的操作,也即所有操作都成功或者失败。
5.如果提交成功,需要删除相关日志。如果失败,则需要对相关分支执行回滚操作。
6.如何保证上面的效率更加高效性?
为了seata的交互的高效性,所有的交互操作都基于Netty的事件驱动完成。
下面的操作都是基于Seata的官方demo的debug和Seata官网学习完成。
二、Seata中数据源的增强
Seata的AT模式的理解可以从Seata的demo中找到答案。seata经过SeataAutoDataSourceProxyCreator是实现Aop数据源增强的关键步骤。
可以看到经过wrapIfNecessary会进行代理的构建,也即
代码语言:javascript复制 SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
创建新的数据源代理,从构造函数中,可以看到分为XA和AT模式。由于我们关注的是AT模式,下面我们来看AT模式:
代码语言:javascript复制 public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
// 初始化操作
init(targetDataSource, resourceGroupId);
}
从上面的代码,我们可以看到这里会先判断数据源是否为Seata数据源代理,如果不是,则执行数据源代理的转换。同时初始化数据源,注册数据源RM信息到Netty中,同时设置分支类型AT模式。
三、一阶段操作
代码语言:javascript复制 private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// 执行提交操作
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
执行本地事务提交会判断是否处在全局事务中,如果是,则执行处理全局事务提交,如果是在全局锁中,则执行处理本地提交带全局锁。否则执行不增强的提交。由于前期,我们属于分支事务,因此我们会执行分支事务提交操作processGlobalTransactionCommit。
而在这个过程中,我们可以看到分支事务提交的几个阶段:
注册分支事务、刷新undo log日志、上报分支执行情况
代码语言:javascript复制 private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 此时完成本地事务的提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
// 进行本地事务提交情况的上报
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
而我们关心的前后镜像生成又是在哪里呢?而在生成前后镜像前,我们需要判断当前执行的sql是哪种类型的执行器,才能放心生成前后镜像执行业务系统的提交操作。
我们会先经过执行模板
代码语言:javascript复制 switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case DELETE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case SELECT_FOR_UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
executor =
new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.MARIADB:
executor =
new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.POLARDBX:
executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
case UPDATE_JOIN:
switch (dbType) {
case JdbcConstants.MYSQL:
executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.MARIADB:
executor = new MariadbUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case JdbcConstants.POLARDBX:
executor = new PolarDBXUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType " not support to " SQLType.UPDATE_JOIN.name());
}
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
生成对应的数据执行器,然后执行生成镜像的操作。
代码语言:javascript复制 protected T executeAutoCommitFalse(Object[] args) throws Exception {
try {
// 这里会生成前镜像和后镜像,在执行业务操作之前
TableRecords beforeImage = beforeImage();
// 执行业务操作
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
// 准备undolog信息
prepareUndoLog(beforeImage, afterImage);
return result;
} catch (TableMetaException e) {
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
e.getTableName(), e.getColumnName());
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
throw e;
}
}
完成这个操作之后之后,会执行最终分支事务的完成操作,此时操作的是mysql的原生操作。
四、二阶段操作
完成提交/回滚操作之后,进行上报动作。完成上报之后,执行二阶段提交/回滚操作。也即branchCommit或者branchRollback操作。也即二阶段的关键在branchCommit或者branchRollback上。
二阶段提交操作:思路是快速响应成功,然后异步删除日志。
二阶段回滚操作:则根据对应的日志,进行恢复sql执行操作。
我理解:生成sql和回滚的这个思路和mysql的操作是类似的。
参考资料:
https://github.com/apache/incubator-seata
https://seata.apache.org