序
本文主要研究一下druid的DestroyConnectionThread
createAndStartDestroyThread
druid-1.2.11-sources.jar!/com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
DruidDataSource的init方法会执行createAndStartDestroyThread;而createAndStartDestroyThread在destroyScheduler不为null的时候会定时(
timeBetweenEvictionRunsMillis
)调度destroyTask,不过destroyScheduler默认为null;同时它还启动了DestroyConnectionThread
DestroyConnectionThread
druid-1.2.11-sources.jar!/com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name) {
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (; ; ) {
// 从前面开始删除
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis执行一下destroyTask
DestroyTask
druid-1.2.11-sources.jar!/com/alibaba/druid/pool/DruidDataSource.java
代码语言:javascript复制 public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
DestroyTask的run方法主要是执行shrink(true, keepAlive)
shrink
代码语言:javascript复制 public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; i) {
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount ] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount ] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount ] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount ] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount ] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount ] = connection;
} else {
break;
}
}
}
int removeCount = evictCount keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount = keepAliveCount;
if (keepAlive && poolingCount activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
for (int i = 0; i < evictCount; i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount ;
if (activeCount poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount poolingCount createTaskCount);
for (int i = 0; i < fillCount; i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
shrink方法先计算checkCount(
poolingCount - minIdle
),之后进入checkTime的逻辑,如果idleMillis大于等于minEvictableIdleTimeMillis,则会判断是否小于checkCount,小于才会纳入evict范围,否则需要大于maxEvictableIdleTimeMillis才能纳入evict范围
小结
DruidDataSource的init方法会执行createAndStartDestroyThread;而createAndStartDestroyThread启动了DestroyConnectionThread,DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis执行一下destroyTask,DestroyTask的run方法主要是执行shrink(true, keepAlive),这个里头如果minIdle配置的跟maxActive一样的话,会导致minEvictableIdleTimeMillis实际上是没有生效的,需要使用maxEvictableIdleTimeMillis才能生效。