Seata学习整理二

2023-02-28 13:30:11 浏览数 (1)

前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。

一、初始化数据源元数据信息

可以看到io.seata.rm.datasource.DataSourceProxy中的构造函数会执行初始化方法

代码语言:javascript复制
  public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        //执行初始化
        init(targetDataSource, resourceGroupId);
    }

执行初始化方法会提取相关信息:

代码语言:javascript复制
    //执行初始化
    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        //获取相关数据源信息
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        //注册数据源
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                //获取数据远连接
                try (Connection connection = dataSource.getConnection()) {
                    //执行刷新表元数据缓存
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        //设置默认分支类型AT到root上下文中
        RootContext.setDefaultBranchType(this.getBranchType());
    }

可以看到 mysql 获取schema

代码语言:javascript复制
 // mysql 获取schema
    @Override
    protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
        // 获取其中的一条,执行sql查询,然后设置元数据信息到schema中
        String sql = "SELECT * FROM "   ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL)   " LIMIT 1";
        try (Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql)) {
            //将结果集元数据设置到schema中
            return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
        } catch (SQLException sqlEx) {
            throw sqlEx;
        } catch (Exception e) {
            throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
        }
    }

设置的结果集元数据中可以看到:schemaName、catalogName、tableName、TableMeta、ColumnMeta、IndexMeta。

同时将表信息放入到缓存中:

代码语言:javascript复制
Cache<String, TableMeta> TABLE_META_CACHE = Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE)
    .expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build();
TABLE_META_CACHE.put(entry.getKey(), tableMeta);    

二、sql识别器

可以看到sql识别器会根据对应sql类型执行sql操作:

代码语言:javascript复制
  switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }

三、一阶段sql执行前后操作

可以看到在io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse中会执行几个重要的操作

生成前镜像、执行sql、生成后镜像、准备undo log日志数据

代码语言:javascript复制
  /**
     * Execute auto commit false t.
     *
     * @param args the args
     * @return the t
     * @throws Exception the exception
     */
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        LOGGER.info("----执行自动提交 false------");
        LOGGER.info("----生成前镜像------");
        TableRecords beforeImage = beforeImage();
        LOGGER.info("----执行sql操作------");
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        LOGGER.info("----生成后镜像------");
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

之后执行二阶段处理提交

四、二阶段提交

提交sql,如果没有发生异常,则删除undo log日志。否则,执行回滚操作,执行undo log日志,也即通过镜像sql执行复原数据操作。

0 人点赞