MyBatis设计思想(3)——数据源模块

2020-09-03 16:29:29 浏览数 (1)

MyBatis设计思想(3)——数据源模块

一. 工厂方法模式

**工厂方法:定义一个创建产品对象的工厂接口,将产品对象的实际创建工作推迟到具体子工厂类当中。这满足创建型模式中所要求的“创建与使用相分离”的特点。**通过工厂方法,客户端可以不关心产品的具体创建过程,直接从工厂中获取实例即可。

  1. IProduct:抽象产品,定义了需要创建的对象的行为。
  2. ConcreteProduct:具体产品,实际要创建的对象。
  3. AbstractFactory:抽象工厂,定义了工厂的行为。
  4. ConcreteFactory:具体工厂,用于创建具体的对象。

二. MyBatis的数据源模块

数据源的创建过程比较复杂,涉及到数据库驱动的加载、配置的加载、数据库连接的获取和管理等。如果每次都让客户端自己创建数据源,那会大大增加客户端的使用成本,因此使用抽象工厂模式,让客户端直接从工厂中获取数据源。

MyBatis定义了DataSourceFactory工厂接口,并提供了两种实现:

  1. UnpooledDataSourceFactory:非池化工厂,用于创建UnpooledDataSource。
  2. PooledDataSourceFactory:池化工厂,用于创建PooledDataSource。

UnpooledDataSource没什么可说的,就是一个简单的非池化的数据源,它获取连接的方式和手动通过 JDBC 获取连接是一样的。

代码语言:javascript复制
/**
 * @author Clinton Begin
 * @author Eduardo Macarron
 *
 * 非池化的数据源
 */
public class UnpooledDataSource implements DataSource {

  private ClassLoader driverClassLoader;
  private Properties driverProperties;
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();

  private String driver;
  private String url;
  private String username;
  private String password;

  private Boolean autoCommit;
  private Integer defaultTransactionIsolationLevel;
  private Integer defaultNetworkTimeout;

  //注册数据库驱动
  static {
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }


  //创建数据库连接
  private Connection doGetConnection(String username, String password) throws SQLException {
    //加载配置
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }

    //创建连接
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化数据库驱动
    initializeDriver();

    //创建数据库连接
    Connection connection = DriverManager.getConnection(url, properties);

    //连接配置
    configureConnection(connection);
    return connection;
  }

  private synchronized void initializeDriver() throws SQLException {
    //注册数据库驱动
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          driverType = Resources.classForName(driver);
        }
        // DriverManager requires the driver to be loaded via the system ClassLoader.
        // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
        Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: "   e);
      }
    }
  }

  //连接配置
  private void configureConnection(Connection conn) throws SQLException {
    //设置超时时间
    if (defaultNetworkTimeout != null) {
      conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
    }

    //设置事务自动提交
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }

    //设置事务隔离级别
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }
}

PooledDataSource是MyBatis实现的一个简单的数据库连接池,实现了连接的复用和管理,内部的一些设计还是比较巧妙的。

三. MyBatis连接池

  1. 核心类
  1. PooledConnection:对JDBC Connection的动态代理,主要是拦截了close方法,对连接资源进行回收和状态的修改,并不会真正关闭连接。
代码语言:javascript复制
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  //拦截close方法,对连接进行回收复用,并不会真正地关闭
  String methodName = method.getName();
  if (CLOSE.equals(methodName)) {
    dataSource.pushConnection(this);
    return null;
  }
  try {
    if (!Object.class.equals(method.getDeclaringClass())) {
      // issue #579 toString() should never fail
      // throw an SQLException instead of a Runtime
      checkConnection();
    }
    return method.invoke(realConnection, args);
  } catch (Throwable t) {
    throw ExceptionUtil.unwrapThrowable(t);
  }

}
  1. PoolState:连接池管理器,对池子中的所有连接进行状态的管理。
代码语言:javascript复制
//数据源实例
protected PooledDataSource dataSource;

//空闲连接队列
protected final List<PooledConnection> idleConnections = new ArrayList<>();

//活跃连接队列
protected final List<PooledConnection> activeConnections = new ArrayList<>();

//各种统计信息
protected long requestCount = 0;
protected long accumulatedRequestTime = 0;
protected long accumulatedCheckoutTime = 0;
protected long claimedOverdueConnectionCount = 0;
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
protected long accumulatedWaitTime = 0;
protected long hadToWaitCount = 0;
protected long badConnectionCount = 0;
  1. PooledDataSource:数据库连接池,配合PoolState,进行数据库连接的生命周期管理。
  2. 连接创建过程
代码语言:javascript复制
private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      //同步控制
      synchronized (state) {
        //1. 首先尝试从空闲队列中获取
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection "   conn.getRealHashCode()   " from pool.");
          }
        }

        //2. 如果没有空闲连接,但活跃连接数没有达到上限,则创建新连接
        else {
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection "   conn.getRealHashCode()   ".");
            }
          }

          //3. 如果活跃连接数已达上限,则检查最老的活跃连接是否已超时。如果已超时,则剔除最老的连接,并创建新连接
          else {
            // Cannot create new connection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              state.claimedOverdueConnectionCount  ;
              state.accumulatedCheckoutTimeOfOverdueConnections  = longestCheckoutTime;
              state.accumulatedCheckoutTime  = longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);

              //剔除超时连接前,首先尝试对事务进行回滚
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection "   conn.getRealHashCode()   ".");
              }
            }
            //4. 如果没有超时的活跃连接,阻塞等待
            else {
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount  ;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as "   poolTimeToWait   " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime  = System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount  ;
            state.accumulatedRequestTime  = System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection ("   conn.getRealHashCode()   ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount  ;
            localBadConnectionCount  ;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections   poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }
  1. 连接回收过程
代码语言:javascript复制
//回收数据库连接
protected void pushConnection(PooledConnection conn) throws SQLException {
  //同步控制
  synchronized (state) {
    //1. 将连接从活跃队列中移除
    state.activeConnections.remove(conn);

    //2. 如果连接仍然有效,且空闲队列未满,则复用底层的连接,并创建空闲连接,放入空闲队列中
    if (conn.isValid()) {
      if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
        state.accumulatedCheckoutTime  = conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          conn.getRealConnection().rollback();
        }
        PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
        state.idleConnections.add(newConn);
        newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
        newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
        conn.invalidate();
        if (log.isDebugEnabled()) {
          log.debug("Returned connection "   newConn.getRealHashCode()   " to pool.");
        }
        state.notifyAll(); //唤醒等待连接的线程
      }
      //3. 如果空闲队列已满,直接关闭底层连接,并将当前连接失效
      else {
        state.accumulatedCheckoutTime  = conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          conn.getRealConnection().rollback();
        }
        conn.getRealConnection().close();
        if (log.isDebugEnabled()) {
          log.debug("Closed connection "   conn.getRealHashCode()   ".");
        }
        conn.invalidate();
      }
    }
    //4. 如果当前连接已失效,则无需任何处理
    else {
      if (log.isDebugEnabled()) {
        log.debug("A bad connection ("   conn.getRealHashCode()   ") attempted to return to the pool, discarding connection.");
      }
      state.badConnectionCount  ;
    }
  }
}

0 人点赞