一、概述
Executor 是一个接口,包含更新,查询,事务等一系列方法。在前面分析SqlSession创建过程的时候,我们知道每个SqlSession对象都会有一个Executor对象,SqlSession的操作都会交由Executor执行器执行。
我们先看看Executor类的继承图:
Executor接口有两个实现,
- 一个是BaseExecutor抽象类,BaseExecutor又有四个子类:
- SimpleExecutor:简单类型的执行器,也是默认的执行器,每次执行update或者select操作,都会创建一个Statement对象,执行结束后关闭Statement对象;
- ReuseExecutor:可重用的执行器,重用的是Statement对象,第一次执行一条sql,会将这条sql的Statement对象缓存在key-value结构的map缓存中。下一次执行,就可以从缓存中取出Statement对象,减少了重复编译的次数,从而提高了性能。每个SqlSession对象都有一个Executor对象,因此这个缓存是SqlSession级别的,所以当SqlSession销毁时,缓存也会销毁;
- BatchExecutor:批量执行器,默认情况是每次执行一条sql,MyBatis都会发送一条sql。而批量执行器的操作是,每次执行一条sql,不会立马发送到数据库,而是批量一次性发送多条sql;
- ClosedExecutor: ResultLoaderMap的内部类,用来进行处理懒加载相关功能;
- 另外一个是CachingExecutor实现类,在缓存的时候用到,使用到了装饰者模式对executor进行二次包装,动态增强了executor的功能;
Executor 接口采用了模版方法的设计模式,定义了一些模版方法,交给子类去实现。定义的方法如下:
代码语言:javascript复制public interface Executor {
ResultHandler NO_RESULT_HANDLER = null;
// 更新
int update(MappedStatement ms, Object parameter) throws SQLException;
// 先查询缓存,在查询数据库
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
// 查询
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
// 返回游标对象
<E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
// 释放Statement
List<BatchResult> flushStatements() throws SQLException;
// 事务提交
void commit(boolean required) throws SQLException;
// 事务回滚
void rollback(boolean required) throws SQLException;
// 创建缓存的键值对
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
// 缓存是否存在
boolean isCached(MappedStatement ms, CacheKey key);
// 清除一级缓存
void clearLocalCache();
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
// 获取事务对象
Transaction getTransaction();
void close(boolean forceRollback);
boolean isClosed();
void setExecutorWrapper(Executor executor);
}
二、Executor执行器的创建过程
在前面分析SqlSession的获取过程的时候,我们当时暂且先跳过了executor执行器的创建过程这一部分的分析。很显然,executor的创建就是在获取SqlSession的时候。
代码语言:javascript复制//org.apache.ibatis.session.defaults.DefaultSqlSessionFactory#openSessionFromDataSource
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
//事务对象
Transaction tx = null;
try {
//从configuration对象中获取到我们之前解析的environment环境信息
final Environment environment = configuration.getEnvironment();
//事务工厂,这里是JbdcTransactionFactory工厂类
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
//通过事务工厂创建JbdcTransaction事务,传入数据源等信息
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
//创建Executor执行器
final Executor executor = configuration.newExecutor(tx, execType);
//创建DefaultSqlSession会话,传入Configuration、Executor对象
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " e, e);
} finally {
ErrorContext.instance().reset();
}
}
- final Executor executor = configuration.newExecutor(tx, execType);
通过configuration对象创建executor执行器,传入前面创建好的事务tx,以及指定的执行器类型。
继续追踪一下newExecutor()的源码:
代码语言:javascript复制public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
//没有配置的话,默认创建SimpleExecutor
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
//第一步:根据ExecutorType来创建不同类型的执行器
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
//第二步:如果开启了一级缓存,使用装饰者模式对executor二次包装成CachingExecutor
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
//这里使用了责任链设计模式,在插件篇幅里面会详细介绍
//第三步:加载插件
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
我们可以看到,newExecutor()方法根据ExecutorType来创建不同类型的执行器,默认创建的是SimpleExecutor简单类型执行器。
接下来我们看一下各种执行器的构造方法:
代码语言:javascript复制public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
我们看到了super(xxx)方法,也就是说三种类型的执行器其实都是调用的父类BaseExecutor的构造方法来创建:
代码语言:javascript复制protected BaseExecutor(Configuration configuration, Transaction transaction) {
//事务
this.transaction = transaction;
//延迟加载
this.deferredLoads = new ConcurrentLinkedQueue<>();
//一级缓存
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
//全局配置对象
this.configuration = configuration;
this.wrapper = this;
}
代码语言:javascript复制final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
到这里,executor创建就完成了,同时每个SqlSession也就有了自己唯一的Executor对象,如上代码executor作为DefaultSqlSession的成员属性。至于executor是在哪里调用的,相信看过前几篇文章的小伙伴都应该很清楚了,就是在SqlSession执行query方法的时候,具体是交给executor去执行查询的,具体过程可以参照前面的文章,这里不过多赘述了。
三、SimpleExecutor
SimpleExecutor 是默认的执行器,也是最简单的执行器。我们来看看里面几个关键方法:
代码语言:javascript复制public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
//默认调用父类BaseExecutor的构造方法
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
//第一步:从MappedStatement中获取到configuration全局配置对象
Configuration configuration = ms.getConfiguration();
//第二步:通过configuration创建StatementHandler
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
//第三步:创建Statement对象
stmt = prepareStatement(handler, ms.getStatementLog());
//第四步:执行SQL查询
return handler.update(stmt);
} finally {
//第五步:关闭statement
closeStatement(stmt);
}
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
//获取到connection连接
Connection connection = getConnection(statementLog);
//创建Statement
stmt = handler.prepare(connection, transaction.getTimeout());
//将sql语句中的占位符替换成最终查询参数
//最底层实现:typeHandler.setParameter(ps, i 1, value, jdbcType);
handler.parameterize(stmt);
return stmt;
}
}
总结一下SimpleExecutor的操作步骤:
- 第一步:从MappedStatement中获取到configuration全局配置对象;
- 第二步:通过configuration创建StatementHandler;
- 第三步:创建Statement对象;
- 第四步:执行SQL查询;
四、ReuseExecutor
ReuseExecutor,重用类型的执行器,它的作用是重复利用statement对象,避免频繁的创建。如果在一个SqlSession中多次执行一条sql,如果每次都去生成Statement对象,会造成资源浪费。因此ReuseExecutor在SimpleExecutor的基础上,对prepareStatement()方法进行了改进,将Statement对象缓存在内存中,并且免去了关闭Statement对象这一步。
代码语言:javascript复制public class ReuseExecutor extends BaseExecutor {
//map结构,key是执行的SQL语句,value就是对应的statement对象
//可以看到,在一个会话中,相同的sql语句对应的statement可以重复利用
private final Map<String, Statement> statementMap = new HashMap<>();
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
//第一步:获取configuration对象
Configuration configuration = ms.getConfiguration();
//第二步:创建StatementHandler
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
//第三步:创建Statement
Statement stmt = prepareStatement(handler, ms.getStatementLog());
//第四步:执行SQL查询
return handler.query(stmt, resultHandler);
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.queryCursor(stmt);
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
BoundSql boundSql = handler.getBoundSql();
//获取到执行的SQL语句
String sql = boundSql.getSql();
if (hasStatementFor(sql)) {
//如果缓存中存在该条sql对应的statement,则直接从缓存中取,不重新创建statement
stmt = getStatement(sql);
applyTransactionTimeout(stmt);
} else {
//如果缓存中不存在该条sql对应的statement
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
//将创建好的statement放入缓存中,方便下次重复利用
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
//判断缓存map中是否存在sql的Statement对象
private boolean hasStatementFor(String sql) {
try {
return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
private Statement getStatement(String s) {
return statementMap.get(s);
}
//以sql为key, Statement为value放到缓存中
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
}
从上面可以看到,ReuseExecutor内存维护了一个map结构的缓存statementMap,以sql为key, Statement为value放到缓存中,保证在同一个会话中,如果重复执行两个相同的SQL,第一次创建完的statement,可以在第二次查询的时候重复利用,节省了一些资源。注意:因为每个SqlSession都有自己唯一的对应的Executor对象,因此这个statementMap缓存是SqlSession级别的,如果SqlSession销毁了,statementMap缓存也会将销毁。
五、BatchExecutor
BatchExecutor是批处理的执行器,批量的发送sql到数据库,而不是一个个的发送。
代码语言:javascript复制public class BatchExecutor extends BaseExecutor {
// 批量更新处理的固定返回值,不是返回受影响的行数
public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE 1002;
// Statement集合
private final List<Statement> statementList = new ArrayList<>();
//批量结果的集合
private final List<BatchResult> batchResultList = new ArrayList<>();
//当前Sql语句
private String currentSql;
//当前的MappedStatement对象
private MappedStatement currentStatement;
public BatchExecutor(Configuration configuration, Transaction transaction) {
//默认调用父类BaseExecutor的构造方法
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
//第一步:获取configuration对象
final Configuration configuration = ms.getConfiguration();
//第二步:创建StatementHandler
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
//第三步:获取到BoundSql,再拿到具体的sql
final BoundSql boundSql = handler.getBoundSql();
final String sql = boundSql.getSql();
final Statement stmt;
//如果当前执行的sql跟拿到的sql一致,并且MappedStatement也是同一个的话
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
stmt = statementList.get(last);
applyTransactionTimeout(stmt);
handler.parameterize(stmt);//fix Issues 322
BatchResult batchResult = batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt); //fix Issues 322
//设置currentSql和currentStatement为当前sql、当前MappedStatement
currentSql = sql;
currentStatement = ms;
//将statement添加到集合中
statementList.add(stmt);
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
//调用JDBC的addBatch()方法,添加到批处理中
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<>();
// 回滚则直接返回空集合
if (isRollback) {
return Collections.emptyList();
}
//遍历statementList集合中的Statement,一条条执行,并将结果加入到结果集合中
for (int i = 0, n = statementList.size(); i < n; i ) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
//执行sql
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// 关闭statement
closeStatement(stmt);
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
//将结果加载到结果集合中
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
}
BatchExecutor总结:
- doUpdate()返回的值是固定的【Integer.MIN_VALUE 1002】,不是影响的行数;
- 如果连续提交相同的sql,则只会执行一次;
- 提交sql不会立马执行,而是等到commit时候才统一执行;
- 底层使用的是JDBC的批处理操作,addBatch()和executeBatch()操作;
六、CachingExecutor
在前面总结executor创建的时候,我们看见过CachingExecutor,它的作用其实就是,如果项目中开启了一级缓存的话,它会将executor使用装饰者模式包装成CachingExecutor,来增强executor的功能。
代码语言:javascript复制if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
下面来看看CachingExecutor在Mybatis中是如何实现的。
代码语言:javascript复制public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
//创建缓存key
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
//org.apache.ibatis.executor.CachingExecutor#query(org.apache.ibatis.mapping.MappedStatement, java.lang.Object, org.apache.ibatis.session.RowBounds, org.apache.ibatis.session.ResultHandler, org.apache.ibatis.cache.CacheKey, org.apache.ibatis.mapping.BoundSql)
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
//拿到二级缓存,判断二级缓存是否存在此数据
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
//如果二级缓存为空,再从一级缓存中取查找,还找不到,则查询数据库,然后再讲结果放到缓存中
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
七、总结
本篇文章主要总结了Mybatis里面其中一个重要的组件--Executor执行器,并介绍了它的继承体系以及分别介绍了它的常见的几个实现类,在执行查询或者缓存方面都是如何工作的,相信大家对Executor没有那么陌生了吧。
鉴于笔者水平有限,如果文章有什么错误或者需要补充的,希望小伙伴们指出来,希望这篇文章对大家有帮助。