Seata的AT模式深入理解

2024-06-17 15:43:55 浏览数 (1)

一、Seata的实现思路

如果要你实现一个简单的Seata的AT模式,你会如何实现?

1.需要对数据源扩展,为什么?

因为数据源是事务提交和回滚的关键,只有对它进行扩展,才有后面的增强。 2.如何增强?

一个简单的做法,也是Spring常用的手法,采用Aop,进行前后逻辑的增强操作,同时进行手动提交事务。

3.增强的过程中,如何对数据进行回滚和提交的操作?

因为我们的做法需要在事务提交或者回滚前,实现对各个分支事务的提交或者回滚,因此不可避免,这个操作,必须可逆,因此需要保留好事务提交或者回滚前的日志操作,类似mysql的undo/redo log惯用的手法。

4.如何执行统一的提交和回滚?

进行提交情况的上报,进行最终的提交和回滚,完成上面的操作,也即所有操作都成功或者失败。

5.如果提交成功,需要删除相关日志。如果失败,则需要对相关分支执行回滚操作。

6.如何保证上面的效率更加高效性?

为了seata的交互的高效性,所有的交互操作都基于Netty的事件驱动完成。

下面的操作都是基于Seata的官方demo的debug和Seata官网学习完成。

二、Seata中数据源的增强

Seata的AT模式的理解可以从Seata的demo中找到答案。seata经过SeataAutoDataSourceProxyCreator是实现Aop数据源增强的关键步骤。

可以看到经过wrapIfNecessary会进行代理的构建,也即

代码语言:javascript复制
 SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);

创建新的数据源代理,从构造函数中,可以看到分为XA和AT模式。由于我们关注的是AT模式,下面我们来看AT模式:

代码语言:javascript复制
 public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        // 初始化操作
        init(targetDataSource, resourceGroupId);
    }

从上面的代码,我们可以看到这里会先判断数据源是否为Seata数据源代理,如果不是,则执行数据源代理的转换。同时初始化数据源,注册数据源RM信息到Netty中,同时设置分支类型AT模式。

三、一阶段操作

代码语言:javascript复制
 private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            // 执行提交操作
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

执行本地事务提交会判断是否处在全局事务中,如果是,则执行处理全局事务提交,如果是在全局锁中,则执行处理本地提交带全局锁。否则执行不增强的提交。由于前期,我们属于分支事务,因此我们会执行分支事务提交操作processGlobalTransactionCommit。

而在这个过程中,我们可以看到分支事务提交的几个阶段:

注册分支事务、刷新undo log日志、上报分支执行情况

代码语言:javascript复制
 private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 此时完成本地事务的提交
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        // 进行本地事务提交情况的上报
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

而我们关心的前后镜像生成又是在哪里呢?而在生成前后镜像前,我们需要判断当前执行的sql是哪种类型的执行器,才能放心生成前后镜像执行业务系统的提交操作。

我们会先经过执行模板

代码语言:javascript复制
   switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case DELETE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case SELECT_FOR_UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor =
                                        new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor =
                                        new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType   " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    case UPDATE_JOIN:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor = new MariadbUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType   " not support to "   SQLType.UPDATE_JOIN.name());
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;

生成对应的数据执行器,然后执行生成镜像的操作。

代码语言:javascript复制
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        try {
            // 这里会生成前镜像和后镜像,在执行业务操作之前
            TableRecords beforeImage = beforeImage();
            // 执行业务操作
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            // 准备undolog信息
            prepareUndoLog(beforeImage, afterImage);
            return result;
        } catch (TableMetaException e) {
            LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
                e.getTableName(), e.getColumnName());
            statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
            throw e;
        }
    }

完成这个操作之后之后,会执行最终分支事务的完成操作,此时操作的是mysql的原生操作。

四、二阶段操作

完成提交/回滚操作之后,进行上报动作。完成上报之后,执行二阶段提交/回滚操作。也即branchCommit或者branchRollback操作。也即二阶段的关键在branchCommit或者branchRollback上。

二阶段提交操作:思路是快速响应成功,然后异步删除日志。

二阶段回滚操作:则根据对应的日志,进行恢复sql执行操作。

我理解:生成sql和回滚的这个思路和mysql的操作是类似的。

参考资料:

https://github.com/apache/incubator-seata

https://seata.apache.org

0 人点赞