大家好,又见面了,我是你们的朋友全栈君。
Java极客 | 作者 / 铿然一叶
这是Java极客的第 54 篇原创文章
1.资源池的动态伸缩
1.为了提升资源池的性能,需要设置最小闲置资源数量,在资源池初始化时完成初始化;而当使用的资源超过最小闲置资源数,消费者释放回池中超过一定时间后要收缩到最小闲置资源数。
2.为了避免无限申请资源导致超出负载,需要设置最大资源数,池中资源不能超出最大资源数。
2.动态伸缩相关类结构
职责说明:
类
职责
HouseKeeper
实现了Runnable接口,负责池资源的动态伸缩。
ScheduledExecutorService
负责调度HouseKeeper。
ScheduledFuture
由ScheduledExecutorService调度返回,可以终止HouseKeeper的执行。
HikariPool
负责在构造器中初始化和调用以上类。
3.源码
3.1.HikariPool
// 初始化ScheduledExecutorService
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
// 调度HouseKeeper, 延迟并周期性调度
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
复制代码
3.2.HouseKeeper
private final class HouseKeeper implements Runnable{
private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run(){
try {
// refresh values in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
logger.warn(“{} – Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.”,
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
}
else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
logger.warn(“{} – Thread starvation or clock leap detected (housekeeper delta={}).”, poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = “Pool “;
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState(“Before cleanup “);
afterPrefix = “After cleanup “;
// 获取未使用的资源
final List notInUse = connectionBag.values(STATE_NOT_IN_USE);
// 未使用资源大于配置的最小闲置资源则关闭多余资源
int toRemove = notInUse.size() – config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, “(connection has passed idleTimeout)”);
toRemove–;
}
}
}
logPoolState(afterPrefix);
// 除了动态减少资源,这里动态扩容资源
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
logger.error(“Unexpected exception in housekeeping task”, e);
}
}
}
复制代码
动态扩容资源:
//HikariPool.java
private synchronized void fillPool(){
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() – getTotalConnections(), config.getMinimumIdle() – getIdleConnections())
– addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd – 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
复制代码
3.3.ConcurrentBag
在ConcurrentBag中释放资源时之会修改资源的状态,不会去改变资源池可用资源数量。
public void requite(final T bagEntry){
// 这里只是修改资源状态,并不减少资源池中可用资源数量。
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) { // 0xff 是255, 每隔256进去一次
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
final List threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
复制代码
4.总结
资源池在初始化时初始化最小资源数。
资源的动态伸缩可以通过JUC工具ScheduledExecutorService调度线程完成,而不需要额外使用第三方定时器。
消费者释放资源时并不会立即减少资源池可用资源数量,因为很可能其他的消费者又会申请资源,为了避免减少无谓的创建资源操作,释放的资源应该在超过一定时间后才真正关闭。
end.
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152357.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...