序
本文主要研究一下druid的borrow行为
getConnection
com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect
com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt ;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
public boolean isFull() {
lock.lock();
try {
return this.poolingCount this.activeCount >= this.maxActive;
} finally {
lock.unlock();
}
}
getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环
获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑(
druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思
)最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中
getConnectionInternal
com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false; ; ) {
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("conn-direct_create ");
}
boolean discard;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount ;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " maxWaitThreadCount ", current wait Thread count "
lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {
errorMsg.append(", sql n")
.append(lastFatalErrorSql);
}
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount ;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
activeCount ;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
if (holder == null) {
long waitNanos = waitNanosLocal.get();
final long activeCount;
final long maxActive;
final long creatingCount;
final long createStartNanos;
final long createErrorCount;
final Throwable createError;
try {
lock.lock();
activeCount = this.activeCount;
maxActive = this.maxActive;
creatingCount = this.creatingCount;
createStartNanos = this.createStartNanos;
createErrorCount = this.createErrorCount;
createError = this.createError;
} finally {
lock.unlock();
}
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")
.append(waitNanos / (1000 * 1000))
.append(", active ").append(activeCount)
.append(", maxActive ").append(maxActive)
.append(", creating ").append(creatingCount);
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}
if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); i) {
if (i != 0) {
buf.append('n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
getConnectionInternal方法先判断是否closed,如果是则抛出DataSourceClosedException,接着判断是否enable,如果不是则抛出DataSourceDisableException,紧接着for循环,它主要根据createDirect来执行不同逻辑,第一次默认createDirect为false;
createDirect为false,对于notEmptyWaitThreadCount大于等于maxWaitThreadCount则抛出SQLException,对于poolingCount为0且activeCount小于maxActive,createScheduler的queue大小大于0的,则设置createDirect为true;否则对于maxWait大于0的,执行pollLast(nanos),否则执行takeLast()
createDirect为true,会通过DruidDataSource.this.createPhysicalConnection()创建物理连接,对于activeCount小于maxActive的,则维护activeCount跳出循环,否则标记discard为true,通过JdbcUtils.close(pyConnInfo.getPhysicalConnection())关闭连接
pollLast
代码语言:javascript复制 private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (; ; ) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}
notEmptyWaitThreadCount ;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount ;
notEmptyWaitNanos = (startEstimate - estimate);
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount ;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
pollLast方法在poolingCount为0时执行emptySignal,另外主要是处理notEmpty这个condition,然后取connections[poolingCount]
takeLast
代码语言:javascript复制 DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount ;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount ;
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount ;
throw ie;
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
takeLast方法在poolingCount为0的时候执行emptySignal,然后通过notEmpty.await()进行阻塞等待,最后返回connections[poolingCount]
emptySignal
代码语言:javascript复制 private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount poolingCount createTaskCount >= maxActive) {
return;
}
submitCreateTask(false);
}
emptySignal方法,对于createScheduler为null的执行empty.signal(),之后判断task数量即maxActive判断,最后执行submitCreateTask(false)
submitCreateTask
代码语言:javascript复制 private void submitCreateTask(boolean initTask) {
createTaskCount ;
CreateConnectionTask task = new CreateConnectionTask(initTask);
if (createTasks == null) {
createTasks = new long[8];
}
boolean putted = false;
for (int i = 0; i < createTasks.length; i) {
if (createTasks[i] == 0) {
createTasks[i] = task.taskId;
putted = true;
break;
}
}
if (!putted) {
long[] array = new long[createTasks.length * 3 / 2];
System.arraycopy(createTasks, 0, array, 0, createTasks.length);
array[createTasks.length] = task.taskId;
createTasks = array;
}
this.createSchedulerFuture = createScheduler.submit(task);
}
submitCreateTask会创建CreateConnectionTask,然后提交到createScheduler执行
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 public class CreateConnectionTask implements Runnable {
private int errorCount;
private boolean initTask;
private final long taskId;
public CreateConnectionTask() {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
}
public CreateConnectionTask(boolean initTask) {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (; ; ) {
// addLast
lock.lock();
try {
if (closed || closing) {
clearCreateTask(taskId);
return;
}
boolean emptyWait = true;
if (createError != null && poolingCount == 0) {
emptyWait = false;
}
if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount poolingCount < minIdle)) // 在keepAlive场景不能放弃创建
&& (!initTask) // 线程池初始化时的任务不能放弃创建
&& !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程
&& !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程
) {
clearCreateTask(taskId);
return;
}
// 防止创建超过maxActive数量的连接
if (activeCount poolingCount >= maxActive) {
clearCreateTask(taskId);
return;
}
}
} finally {
lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {
physicalConnection = createPhysicalConnection();
} catch (OutOfMemoryError e) {
LOG.error("create connection OutOfMemoryError, out memory. ", e);
errorCount ;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " jdbcUrl, e);
errorCount ;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection Error", e);
// unknow fatal exception
setFailContinuous(true);
break;
} catch (Throwable e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection unexecpted error.", e);
break;
}
if (physicalConnection == null) {
continue;
}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {
JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}
CreateConnectionTask通过for循环,然后加锁处理minIdle及maxActive,最后通过createPhysicalConnection创建物理连接
createPhysicalConnection
com/alibaba/druid/pool/DruidAbstractDataSource.java
代码语言:javascript复制 public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
String url = this.getUrl();
Properties connectProperties = getConnectProperties();
String user;
if (getUserCallback() != null) {
user = getUserCallback().getName();
} else {
user = getUsername();
}
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null) {
if (passwordCallback instanceof DruidPasswordCallback) {
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if (chars != null) {
password = new String(chars);
}
}
Properties physicalConnectProperties = new Properties();
if (connectProperties != null) {
physicalConnectProperties.putAll(connectProperties);
}
if (user != null && user.length() != 0) {
physicalConnectProperties.put("user", user);
}
if (password != null && password.length() != 0) {
physicalConnectProperties.put("password", password);
}
Connection conn = null;
long connectStartNanos = System.nanoTime();
long connectedNanos, initedNanos, validatedNanos;
Map<String, Object> variables = initVariants
? new HashMap<String, Object>()
: null;
Map<String, Object> globalVariables = initGlobalVariants
? new HashMap<String, Object>()
: null;
createStartNanosUpdater.set(this, connectStartNanos);
creatingCountUpdater.incrementAndGet(this);
try {
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {
throw new SQLException("connect error, url " url ", driverClass " this.driverClass);
}
initPhysicalConnection(conn, variables, globalVariables);
initedNanos = System.nanoTime();
validateConnection(conn);
validatedNanos = System.nanoTime();
setFailContinuous(false);
setCreateError(null);
} catch (SQLException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (RuntimeException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (Error ex) {
createErrorCountUpdater.incrementAndGet(this);
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} finally {
long nano = System.nanoTime() - connectStartNanos;
createTimespan = nano;
creatingCountUpdater.decrementAndGet(this);
}
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
}
createPhysicalConnection通过try catch去创建物理连接,若有异常则会通过JdbcUtils.close(conn)去关闭连接
testConnectionInternal
代码语言:javascript复制 protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {
String sqlFile = JdbcSqlStat.getContextSqlFile();
String sqlName = JdbcSqlStat.getContextSqlName();
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(null);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(null);
}
try {
if (validConnectionChecker != null) {
boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
long currentTimeMillis = System.currentTimeMillis();
if (holder != null) {
holder.lastValidTimeMillis = currentTimeMillis;
holder.lastExecTimeMillis = currentTimeMillis;
}
if (valid && isMySql) { // unexcepted branch
long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
if (lastPacketReceivedTimeMs > 0) {
long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
if (lastPacketReceivedTimeMs > 0 //
&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
discardConnection(holder);
String errorMsg = "discard long time none received connection. "
", jdbcUrl : " jdbcUrl
", version : " VERSION.getVersionNumber()
", lastPacketReceivedIdleMillis : " mysqlIdleMillis;
LOG.warn(errorMsg);
return false;
}
}
}
if (valid && onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false;
}
} finally {
lock.unlock();
}
}
return valid;
}
if (conn.isClosed()) {
return false;
}
if (null == validationQuery) {
return true;
}
Statement stmt = null;
ResultSet rset = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {
return false;
}
} finally {
JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}
if (onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false;
}
} finally {
lock.unlock();
}
}
return true;
} catch (Throwable ex) {
// skip
return false;
} finally {
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(sqlFile);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(sqlName);
}
}
}
testConnectionInternal主要通过validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)来校验连接,如果validConnectionChecker为null则通过jdbc执行validationQuery进行校验
discardConnection
代码语言:javascript复制 public void discardConnection(DruidConnectionHolder holder) {
if (holder == null) {
return;
}
Connection conn = holder.getConnection();
if (conn != null) {
JdbcUtils.close(conn);
}
lock.lock();
try {
if (holder.discard) {
return;
}
if (holder.active) {
activeCount--;
holder.active = false;
}
discardCount ;
holder.discard = true;
if (activeCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
discardConnection方法主要是关闭connection,之后枷锁处理一些统计标记
小结
DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环
获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑
最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中
整体代码看下来感觉跟commons-pool相比,druid代码的实现感觉有点粗糙,抽象层级不够高,代码充斥大量统计标记、状态位的处理,维护起来得很小心,另外druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思