序
本文主要研究一下PowerJob的CleanService
CleanService
代码语言:javascript复制@Slf4j
@Service
public class CleanService {
private final DFsService dFsService;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final LockService lockService;
private final int instanceInfoRetentionDay;
private final int localContainerRetentionDay;
private final int remoteContainerRetentionDay;
private static final int TEMPORARY_RETENTION_DAY = 3;
/**
* 每天凌晨3点定时清理
*/
private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?";
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
public CleanService(DFsService dFsService, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
@Value("${oms.container.retention.local}") int localContainerRetentionDay,
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
this.dFsService = dFsService;
this.instanceInfoRepository = instanceInfoRepository;
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
this.workflowNodeInfoRepository = workflowNodeInfoRepository;
this.lockService = lockService;
this.instanceInfoRetentionDay = instanceInfoRetentionDay;
this.localContainerRetentionDay = localContainerRetentionDay;
this.remoteContainerRetentionDay = remoteContainerRetentionDay;
}
//......
}
CleanService提供了timingClean、cleanLocal方法
timingClean
代码语言:javascript复制 @Async(PJThreadPool.TIMING_POOL)
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
public void timingClean() {
// 释放本地缓存
WorkerClusterManagerService.cleanUp();
// 释放磁盘空间
cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay);
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
// 删除数据库历史的数据
cleanByOneServer();
}
timingClean先执行WorkerClusterManagerService.cleanUp()释放本地缓存,之后通过cleanLocal释放本地磁盘空间,最后执行cleanByOneServer删除历史数据
cleanLocal
代码语言:javascript复制 @VisibleForTesting
public void cleanLocal(String path, int day) {
if (day < 0) {
log.info("[CleanService] won't clean up {} because of offset day <= 0.", path);
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
File dir = new File(path);
if (!dir.exists()) {
return;
}
File[] logFiles = dir.listFiles();
if (logFiles == null || logFiles.length == 0) {
return;
}
// 计算最大偏移量
long maxOffset = day * 24 * 60 * 60 * 1000L;
for (File f : logFiles) {
long offset = System.currentTimeMillis() - f.lastModified();
if (offset >= maxOffset) {
if (!f.delete()) {
log.warn("[CleanService] delete file({}) failed.", f.getName());
}else {
log.info("[CleanService] delete file({}) successfully.", f.getName());
}
}
}
log.info("[CleanService] clean {} successfully, using {}.", path, stopwatch.stop());
}
cleanLocal会删除指定目录中lastModified距离当前时间大于等于1天的文件
cleanByOneServer
代码语言:javascript复制 private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L);
if (!lock) {
log.info("[CleanService] clean job is already running, just return.");
return;
}
try {
// 删除数据库运行记录
cleanInstanceLog();
cleanWorkflowInstanceLog();
// 删除无用节点
cleanWorkflowNodeInfo();
// 删除 GridFS 过期文件
cleanRemote(Constants.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(Constants.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally {
lockService.unlock(HISTORY_DELETE_LOCK);
}
}
cleanByOneServer先加锁,然后执行cleanInstanceLog、cleanWorkflowInstanceLog、cleanWorkflowNodeInfo等
小结
PowerJob的CleanService提供了timingClean、cleanLocal方法,其中timingClean先执行WorkerClusterManagerService.cleanUp()释放本地缓存,之后通过cleanLocal释放本地磁盘空间,最后执行cleanByOneServer删除历史数据;cleanLocal会删除指定目录中lastModified距离当前时间大于等于1天的文件。