MyBatis源码阅读(八) --- Executor执行器

2024-01-30 09:00:30 浏览数 (1)

一、概述

Executor 是一个接口,包含更新,查询,事务等一系列方法。在前面分析SqlSession创建过程的时候,我们知道每个SqlSession对象都会有一个Executor对象,SqlSession的操作都会交由Executor执行器执行。

我们先看看Executor类的继承图:

Executor接口有两个实现,

  • 一个是BaseExecutor抽象类,BaseExecutor又有四个子类:
    • SimpleExecutor:简单类型的执行器,也是默认的执行器,每次执行update或者select操作,都会创建一个Statement对象,执行结束后关闭Statement对象;
    • ReuseExecutor:可重用的执行器,重用的是Statement对象,第一次执行一条sql,会将这条sql的Statement对象缓存在key-value结构的map缓存中。下一次执行,就可以从缓存中取出Statement对象,减少了重复编译的次数,从而提高了性能。每个SqlSession对象都有一个Executor对象,因此这个缓存是SqlSession级别的,所以当SqlSession销毁时,缓存也会销毁;
    • BatchExecutor:批量执行器,默认情况是每次执行一条sql,MyBatis都会发送一条sql。而批量执行器的操作是,每次执行一条sql,不会立马发送到数据库,而是批量一次性发送多条sql;
    • ClosedExecutor: ResultLoaderMap的内部类,用来进行处理懒加载相关功能;
  • 另外一个是CachingExecutor实现类,在缓存的时候用到,使用到了装饰者模式对executor进行二次包装,动态增强了executor的功能;

Executor 接口采用了模版方法的设计模式,定义了一些模版方法,交给子类去实现。定义的方法如下:

代码语言:javascript复制
public interface Executor {
 
  ResultHandler NO_RESULT_HANDLER = null;
  // 更新
  int update(MappedStatement ms, Object parameter) throws SQLException;
 
  // 先查询缓存,在查询数据库
  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
 
  // 查询
  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
 
  // 返回游标对象
  <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
 
  // 释放Statement
  List<BatchResult> flushStatements() throws SQLException;
 
  // 事务提交
  void commit(boolean required) throws SQLException;
 
  // 事务回滚
  void rollback(boolean required) throws SQLException;
 
  // 创建缓存的键值对
  CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
 
  // 缓存是否存在
  boolean isCached(MappedStatement ms, CacheKey key);
 
  // 清除一级缓存
  void clearLocalCache();
 
  void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
 
  // 获取事务对象
  Transaction getTransaction();
 
  void close(boolean forceRollback);
 
  boolean isClosed();
 
  void setExecutorWrapper(Executor executor);
 
}
二、Executor执行器的创建过程

在前面分析SqlSession的获取过程的时候,我们当时暂且先跳过了executor执行器的创建过程这一部分的分析。很显然,executor的创建就是在获取SqlSession的时候。

代码语言:javascript复制
//org.apache.ibatis.session.defaults.DefaultSqlSessionFactory#openSessionFromDataSource
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
  //事务对象
  Transaction tx = null;
  try {
    //从configuration对象中获取到我们之前解析的environment环境信息
    final Environment environment = configuration.getEnvironment();
    //事务工厂,这里是JbdcTransactionFactory工厂类
    final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
    //通过事务工厂创建JbdcTransaction事务,传入数据源等信息
    tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
    //创建Executor执行器
    final Executor executor = configuration.newExecutor(tx, execType);
    //创建DefaultSqlSession会话,传入Configuration、Executor对象
    return new DefaultSqlSession(configuration, executor, autoCommit);
  } catch (Exception e) {
    closeTransaction(tx); // may have fetched a connection so lets call close()
    throw ExceptionFactory.wrapException("Error opening session.  Cause: "   e, e);
  } finally {
    ErrorContext.instance().reset();
  }
}
  • final Executor executor = configuration.newExecutor(tx, execType);

通过configuration对象创建executor执行器,传入前面创建好的事务tx,以及指定的执行器类型。

继续追踪一下newExecutor()的源码:

代码语言:javascript复制
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
  executorType = executorType == null ? defaultExecutorType : executorType;
  //没有配置的话,默认创建SimpleExecutor
  executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
  Executor executor;
  //第一步:根据ExecutorType来创建不同类型的执行器
  if (ExecutorType.BATCH == executorType) {
    executor = new BatchExecutor(this, transaction);
  } else if (ExecutorType.REUSE == executorType) {
    executor = new ReuseExecutor(this, transaction);
  } else {
    executor = new SimpleExecutor(this, transaction);
  }
  //第二步:如果开启了一级缓存,使用装饰者模式对executor二次包装成CachingExecutor
  if (cacheEnabled) {
    executor = new CachingExecutor(executor);
  }
  //这里使用了责任链设计模式,在插件篇幅里面会详细介绍
  //第三步:加载插件
  executor = (Executor) interceptorChain.pluginAll(executor);
  return executor;
}

我们可以看到,newExecutor()方法根据ExecutorType来创建不同类型的执行器,默认创建的是SimpleExecutor简单类型执行器。

接下来我们看一下各种执行器的构造方法:

代码语言:javascript复制
public SimpleExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}
public ReuseExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}
public BatchExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}

我们看到了super(xxx)方法,也就是说三种类型的执行器其实都是调用的父类BaseExecutor的构造方法来创建:

代码语言:javascript复制
protected BaseExecutor(Configuration configuration, Transaction transaction) {
  //事务
  this.transaction = transaction;
  //延迟加载
  this.deferredLoads = new ConcurrentLinkedQueue<>();
  //一级缓存
  this.localCache = new PerpetualCache("LocalCache");
  this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
  this.closed = false;
  //全局配置对象
  this.configuration = configuration;
  this.wrapper = this;
}
代码语言:javascript复制
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);

到这里,executor创建就完成了,同时每个SqlSession也就有了自己唯一的Executor对象,如上代码executor作为DefaultSqlSession的成员属性。至于executor是在哪里调用的,相信看过前几篇文章的小伙伴都应该很清楚了,就是在SqlSession执行query方法的时候,具体是交给executor去执行查询的,具体过程可以参照前面的文章,这里不过多赘述了。

三、SimpleExecutor

SimpleExecutor 是默认的执行器,也是最简单的执行器。我们来看看里面几个关键方法:

代码语言:javascript复制
public class SimpleExecutor extends BaseExecutor {
 
  public SimpleExecutor(Configuration configuration, Transaction transaction) {
    //默认调用父类BaseExecutor的构造方法
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Statement stmt = null;
    try {
      //第一步:从MappedStatement中获取到configuration全局配置对象
      Configuration configuration = ms.getConfiguration();
      //第二步:通过configuration创建StatementHandler
      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
      //第三步:创建Statement对象
      stmt = prepareStatement(handler, ms.getStatementLog());
      //第四步:执行SQL查询
      return handler.update(stmt);
    } finally {
      //第五步:关闭statement
      closeStatement(stmt);
    }
  }
 
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      stmt = prepareStatement(handler, ms.getStatementLog());
      return handler.query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }
 
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    //获取到connection连接
    Connection connection = getConnection(statementLog);
    //创建Statement 
    stmt = handler.prepare(connection, transaction.getTimeout());
    //将sql语句中的占位符替换成最终查询参数
    //最底层实现:typeHandler.setParameter(ps, i   1, value, jdbcType);
    handler.parameterize(stmt);
    return stmt;
  }
 
}

总结一下SimpleExecutor的操作步骤:

  1. 第一步:从MappedStatement中获取到configuration全局配置对象;
  2. 第二步:通过configuration创建StatementHandler;
  3. 第三步:创建Statement对象;
  4. 第四步:执行SQL查询;
四、ReuseExecutor

ReuseExecutor,重用类型的执行器,它的作用是重复利用statement对象,避免频繁的创建。如果在一个SqlSession中多次执行一条sql,如果每次都去生成Statement对象,会造成资源浪费。因此ReuseExecutor在SimpleExecutor的基础上,对prepareStatement()方法进行了改进,将Statement对象缓存在内存中,并且免去了关闭Statement对象这一步。

代码语言:javascript复制
public class ReuseExecutor extends BaseExecutor {
    
  //map结构,key是执行的SQL语句,value就是对应的statement对象
  //可以看到,在一个会话中,相同的sql语句对应的statement可以重复利用  
  private final Map<String, Statement> statementMap = new HashMap<>();
 
  public ReuseExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.update(stmt);
  }
 
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    //第一步:获取configuration对象
    Configuration configuration = ms.getConfiguration();
    //第二步:创建StatementHandler
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    //第三步:创建Statement
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    //第四步:执行SQL查询
    return handler.query(stmt, resultHandler);
  }
 
  @Override
  protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.queryCursor(stmt);
  }
    
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    BoundSql boundSql = handler.getBoundSql();
    //获取到执行的SQL语句
    String sql = boundSql.getSql();
    if (hasStatementFor(sql)) {
      //如果缓存中存在该条sql对应的statement,则直接从缓存中取,不重新创建statement
      stmt = getStatement(sql);
      applyTransactionTimeout(stmt);
    } else {
      //如果缓存中不存在该条sql对应的statement
      Connection connection = getConnection(statementLog);
      stmt = handler.prepare(connection, transaction.getTimeout());
      //将创建好的statement放入缓存中,方便下次重复利用
      putStatement(sql, stmt);
    }
    handler.parameterize(stmt);
    return stmt;
  }
  
  //判断缓存map中是否存在sql的Statement对象
  private boolean hasStatementFor(String sql) {
    try {
      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
    } catch (SQLException e) {
      return false;
    }
  }
 
  private Statement getStatement(String s) {
    return statementMap.get(s);
  }
   
  //以sql为key, Statement为value放到缓存中   
  private void putStatement(String sql, Statement stmt) {
    statementMap.put(sql, stmt);
  }
 
}

从上面可以看到,ReuseExecutor内存维护了一个map结构的缓存statementMap,以sql为key, Statement为value放到缓存中,保证在同一个会话中,如果重复执行两个相同的SQL,第一次创建完的statement,可以在第二次查询的时候重复利用,节省了一些资源。注意:因为每个SqlSession都有自己唯一的对应的Executor对象,因此这个statementMap缓存是SqlSession级别的,如果SqlSession销毁了,statementMap缓存也会将销毁。

五、BatchExecutor

BatchExecutor是批处理的执行器,批量的发送sql到数据库,而不是一个个的发送。

代码语言:javascript复制
public class BatchExecutor extends BaseExecutor {
  // 批量更新处理的固定返回值,不是返回受影响的行数
  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE   1002;
  // Statement集合 
  private final List<Statement> statementList = new ArrayList<>();
  //批量结果的集合
  private final List<BatchResult> batchResultList = new ArrayList<>();
  //当前Sql语句
  private String currentSql;
 //当前的MappedStatement对象
 private MappedStatement currentStatement;
 
  public BatchExecutor(Configuration configuration, Transaction transaction) {
    //默认调用父类BaseExecutor的构造方法
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    //第一步:获取configuration对象
    final Configuration configuration = ms.getConfiguration();
    //第二步:创建StatementHandler
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    //第三步:获取到BoundSql,再拿到具体的sql
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    //如果当前执行的sql跟拿到的sql一致,并且MappedStatement也是同一个的话
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
      applyTransactionTimeout(stmt);
      handler.parameterize(stmt);//fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection, transaction.getTimeout());
      handler.parameterize(stmt);    //fix Issues 322
      //设置currentSql和currentStatement为当前sql、当前MappedStatement
      currentSql = sql;
      currentStatement = ms;
      //将statement添加到集合中
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    //调用JDBC的addBatch()方法,添加到批处理中
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }
 
  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<>();
      // 回滚则直接返回空集合
      if (isRollback) {
        return Collections.emptyList();
      }
      //遍历statementList集合中的Statement,一条条执行,并将结果加入到结果集合中
      for (int i = 0, n = statementList.size(); i < n; i  ) {
        Statement stmt = statementList.get(i);
        applyTransactionTimeout(stmt);
        BatchResult batchResult = batchResultList.get(i);
        try {
          //执行sql
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
          // 关闭statement
          closeStatement(stmt);
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i   1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        //将结果加载到结果集合中
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }
 
}

BatchExecutor总结:

  • doUpdate()返回的值是固定的【Integer.MIN_VALUE 1002】,不是影响的行数;
  • 如果连续提交相同的sql,则只会执行一次;
  • 提交sql不会立马执行,而是等到commit时候才统一执行;
  • 底层使用的是JDBC的批处理操作,addBatch()和executeBatch()操作;
六、CachingExecutor

在前面总结executor创建的时候,我们看见过CachingExecutor,它的作用其实就是,如果项目中开启了一级缓存的话,它会将executor使用装饰者模式包装成CachingExecutor,来增强executor的功能。

代码语言:javascript复制
if (cacheEnabled) {
  executor = new CachingExecutor(executor);
}

下面来看看CachingExecutor在Mybatis中是如何实现的。

代码语言:javascript复制
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
  BoundSql boundSql = ms.getBoundSql(parameterObject);
  //创建缓存key
  CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
  return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
 
//org.apache.ibatis.executor.CachingExecutor#query(org.apache.ibatis.mapping.MappedStatement, java.lang.Object, org.apache.ibatis.session.RowBounds, org.apache.ibatis.session.ResultHandler, org.apache.ibatis.cache.CacheKey, org.apache.ibatis.mapping.BoundSql)
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
    throws SQLException {
  //拿到二级缓存,判断二级缓存是否存在此数据
  Cache cache = ms.getCache();
  if (cache != null) {
    flushCacheIfRequired(ms);
    if (ms.isUseCache() && resultHandler == null) {
      ensureNoOutParams(ms, boundSql);
      @SuppressWarnings("unchecked")
      List<E> list = (List<E>) tcm.getObject(cache, key);
      if (list == null) {
        //如果二级缓存为空,再从一级缓存中取查找,还找不到,则查询数据库,然后再讲结果放到缓存中  
        list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
        tcm.putObject(cache, key, list); // issue #578 and #116
      }
      return list;
    }
  }
  return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
七、总结

本篇文章主要总结了Mybatis里面其中一个重要的组件--Executor执行器,并介绍了它的继承体系以及分别介绍了它的常见的几个实现类,在执行查询或者缓存方面都是如何工作的,相信大家对Executor没有那么陌生了吧。

鉴于笔者水平有限,如果文章有什么错误或者需要补充的,希望小伙伴们指出来,希望这篇文章对大家有帮助。

0 人点赞