MyBatis设计思想(3)——数据源模块
一. 工厂方法模式
**工厂方法:定义一个创建产品对象的工厂接口,将产品对象的实际创建工作推迟到具体子工厂类当中。这满足创建型模式中所要求的“创建与使用相分离”的特点。**通过工厂方法,客户端可以不关心产品的具体创建过程,直接从工厂中获取实例即可。
- IProduct:抽象产品,定义了需要创建的对象的行为。
- ConcreteProduct:具体产品,实际要创建的对象。
- AbstractFactory:抽象工厂,定义了工厂的行为。
- ConcreteFactory:具体工厂,用于创建具体的对象。
二. MyBatis的数据源模块
数据源的创建过程比较复杂,涉及到数据库驱动的加载、配置的加载、数据库连接的获取和管理等。如果每次都让客户端自己创建数据源,那会大大增加客户端的使用成本,因此使用抽象工厂模式,让客户端直接从工厂中获取数据源。
MyBatis定义了DataSourceFactory工厂接口,并提供了两种实现:
- UnpooledDataSourceFactory:非池化工厂,用于创建UnpooledDataSource。
- PooledDataSourceFactory:池化工厂,用于创建PooledDataSource。
UnpooledDataSource没什么可说的,就是一个简单的非池化的数据源,它获取连接的方式和手动通过 JDBC 获取连接是一样的。
代码语言:javascript复制/**
* @author Clinton Begin
* @author Eduardo Macarron
*
* 非池化的数据源
*/
public class UnpooledDataSource implements DataSource {
private ClassLoader driverClassLoader;
private Properties driverProperties;
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
private String driver;
private String url;
private String username;
private String password;
private Boolean autoCommit;
private Integer defaultTransactionIsolationLevel;
private Integer defaultNetworkTimeout;
//注册数据库驱动
static {
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
//创建数据库连接
private Connection doGetConnection(String username, String password) throws SQLException {
//加载配置
Properties props = new Properties();
if (driverProperties != null) {
props.putAll(driverProperties);
}
if (username != null) {
props.setProperty("user", username);
}
if (password != null) {
props.setProperty("password", password);
}
//创建连接
return doGetConnection(props);
}
private Connection doGetConnection(Properties properties) throws SQLException {
//初始化数据库驱动
initializeDriver();
//创建数据库连接
Connection connection = DriverManager.getConnection(url, properties);
//连接配置
configureConnection(connection);
return connection;
}
private synchronized void initializeDriver() throws SQLException {
//注册数据库驱动
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
driverType = Resources.classForName(driver);
}
// DriverManager requires the driver to be loaded via the system ClassLoader.
// http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
DriverManager.registerDriver(new DriverProxy(driverInstance));
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " e);
}
}
}
//连接配置
private void configureConnection(Connection conn) throws SQLException {
//设置超时时间
if (defaultNetworkTimeout != null) {
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
}
//设置事务自动提交
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);
}
//设置事务隔离级别
if (defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
}
PooledDataSource是MyBatis实现的一个简单的数据库连接池,实现了连接的复用和管理,内部的一些设计还是比较巧妙的。
三. MyBatis连接池
- 核心类
- PooledConnection:对JDBC Connection的动态代理,主要是拦截了close方法,对连接资源进行回收和状态的修改,并不会真正关闭连接。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//拦截close方法,对连接进行回收复用,并不会真正地关闭
String methodName = method.getName();
if (CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
- PoolState:连接池管理器,对池子中的所有连接进行状态的管理。
//数据源实例
protected PooledDataSource dataSource;
//空闲连接队列
protected final List<PooledConnection> idleConnections = new ArrayList<>();
//活跃连接队列
protected final List<PooledConnection> activeConnections = new ArrayList<>();
//各种统计信息
protected long requestCount = 0;
protected long accumulatedRequestTime = 0;
protected long accumulatedCheckoutTime = 0;
protected long claimedOverdueConnectionCount = 0;
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
protected long accumulatedWaitTime = 0;
protected long hadToWaitCount = 0;
protected long badConnectionCount = 0;
- PooledDataSource:数据库连接池,配合PoolState,进行数据库连接的生命周期管理。
- 连接创建过程
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
//同步控制
synchronized (state) {
//1. 首先尝试从空闲队列中获取
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " conn.getRealHashCode() " from pool.");
}
}
//2. 如果没有空闲连接,但活跃连接数没有达到上限,则创建新连接
else {
// Pool does not have available connection
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " conn.getRealHashCode() ".");
}
}
//3. 如果活跃连接数已达上限,则检查最老的活跃连接是否已超时。如果已超时,则剔除最老的连接,并创建新连接
else {
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// Can claim overdue connection
state.claimedOverdueConnectionCount ;
state.accumulatedCheckoutTimeOfOverdueConnections = longestCheckoutTime;
state.accumulatedCheckoutTime = longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
//剔除超时连接前,首先尝试对事务进行回滚
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " conn.getRealHashCode() ".");
}
}
//4. 如果没有超时的活跃连接,阻塞等待
else {
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount ;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " poolTimeToWait " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
state.accumulatedWaitTime = System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// ping to server and check the connection is valid or not
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount ;
state.accumulatedRequestTime = System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" conn.getRealHashCode() ") was returned from the pool, getting another connection.");
}
state.badConnectionCount ;
localBadConnectionCount ;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
- 连接回收过程
//回收数据库连接
protected void pushConnection(PooledConnection conn) throws SQLException {
//同步控制
synchronized (state) {
//1. 将连接从活跃队列中移除
state.activeConnections.remove(conn);
//2. 如果连接仍然有效,且空闲队列未满,则复用底层的连接,并创建空闲连接,放入空闲队列中
if (conn.isValid()) {
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime = conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " newConn.getRealHashCode() " to pool.");
}
state.notifyAll(); //唤醒等待连接的线程
}
//3. 如果空闲队列已满,直接关闭底层连接,并将当前连接失效
else {
state.accumulatedCheckoutTime = conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " conn.getRealHashCode() ".");
}
conn.invalidate();
}
}
//4. 如果当前连接已失效,则无需任何处理
else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" conn.getRealHashCode() ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount ;
}
}
}