序
本文主要研究一下PowerJob的InstanceController
InstanceController
tech/powerjob/server/web/controller/InstanceController.java
代码语言:javascript复制@Slf4j
@RestController
@RequestMapping("/instance")
public class InstanceController {
@Resource
private InstanceService instanceService;
@Resource
private InstanceLogService instanceLogService;
@Resource
private CacheService cacheService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long appId,Long instanceId) {
instanceService.stopInstance(appId,instanceId);
return ResultDTO.success(null);
}
@GetMapping("/retry")
public ResultDTO<Void> retryInstance(String appId, Long instanceId) {
instanceService.retryInstance(Long.valueOf(appId), instanceId);
return ResultDTO.success(null);
}
@GetMapping("/detail")
public ResultDTO<InstanceDetailVO> getInstanceDetail(Long instanceId) {
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId)));
}
@GetMapping("/log")
public ResultDTO<StringPage> getInstanceLog(Long appId, Long instanceId, Long index) {
return ResultDTO.success(instanceLogService.fetchInstanceLog(appId, instanceId, index));
}
@GetMapping("/downloadLogUrl")
public ResultDTO<String> getDownloadUrl(Long appId, Long instanceId) {
return ResultDTO.success(instanceLogService.fetchDownloadUrl(appId, instanceId));
}
@GetMapping("/downloadLog")
public void downloadLogFile(Long instanceId , HttpServletResponse response) throws Exception {
File file = instanceLogService.downloadInstanceLog(instanceId);
OmsFileUtils.file2HttpResponse(file, response);
}
@GetMapping("/downloadLog4Console")
@SneakyThrows
public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) {
// 获取内部下载链接
String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
// 先下载到本机
String logFilePath = OmsFileUtils.genTemporaryWorkPath() String.format("powerjob-%s-%s.log", appId, instanceId);
File logFile = new File(logFilePath);
try {
FileUtils.copyURLToFile(new URL(downloadUrl), logFile);
// 再推送到浏览器
OmsFileUtils.file2HttpResponse(logFile, response);
} finally {
FileUtils.forceDelete(logFile);
}
}
@PostMapping("/list")
public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {
Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);
InstanceInfoDO queryEntity = new InstanceInfoDO();
BeanUtils.copyProperties(request, queryEntity);
queryEntity.setType(request.getType().getV());
if (!StringUtils.isEmpty(request.getStatus())) {
queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV());
}
Page<InstanceInfoDO> pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable);
return ResultDTO.success(convertPage(pageResult));
}
private PageResult<InstanceInfoVO> convertPage(Page<InstanceInfoDO> page) {
List<InstanceInfoVO> content = page.getContent().stream()
.map(x -> InstanceInfoVO.from(x, cacheService.getJobName(x.getJobId()))).collect(Collectors.toList());
PageResult<InstanceInfoVO> pageResult = new PageResult<>(page);
pageResult.setData(content);
return pageResult;
}
}
InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托给了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托给了instanceLogService;list委托给了instanceInfoRepository
InstanceService
tech/powerjob/server/core/instance/InstanceService.java
代码语言:javascript复制@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceService {
private final TransportService transportService;
private final DispatchService dispatchService;
private final IdGenerateService idGenerateService;
private final InstanceManager instanceManager;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
//......
}
InstanceService提供了stopInstance、retryInstance、cancelInstance等方法
stopInstance
代码语言:javascript复制 @DesignateServer
public void stopInstance(Long appId,Long instanceId) {
log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
// 判断状态,只有运行中才能停止
if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) {
throw new IllegalArgumentException("can't stop finished instance!");
}
// 更新数据库,将状态置为停止
instanceInfo.setStatus(STOPPED.getV());
instanceInfo.setGmtModified(new Date());
instanceInfo.setFinishedTime(System.currentTimeMillis());
instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfo);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);
/*
不可靠通知停止 TaskTracker
假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭
*/
Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req);
log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
} else {
log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
}
} catch (IllegalArgumentException ie) {
throw ie;
} catch (Exception e) {
log.error("[Instance-{}] stopInstance failed.", instanceId, e);
throw e;
}
}
stopInstance先通过fetchInstanceInfo获取instanceInfo,然后判断状态是否是运行中,是则更新status为STOPPED,然后通过instanceManager.processFinishedInstance完成收尾工作,对于能找到WorkerInfo的,发起ServerStopInstanceReq请求
retryInstance
代码语言:javascript复制 @DesignateServer
public void retryInstance(Long appId, Long instanceId) {
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) {
throw new PowerJobException("Only stopped instance can be retry!");
}
// 暂时不支持工作流任务的重试
if (instanceInfo.getWfInstanceId() != null) {
throw new PowerJobException("Workflow's instance do not support retry!");
}
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceInfo.setExpectedTriggerTime(System.currentTimeMillis());
instanceInfo.setFinishedTime(null);
instanceInfo.setActualTriggerTime(null);
instanceInfo.setTaskTrackerAddress(null);
instanceInfo.setResult(null);
instanceInfoRepository.saveAndFlush(instanceInfo);
// 派发任务
Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " jobId));
dispatchService.dispatch(jobInfo, instanceId,Optional.of(instanceInfo),Optional.empty());
}
retryInstance先拉取instanceInfo,判断状态是不是FINISHED_STATUS,是则更新status为WAITING_DISPATCH,然后通过dispatchService.dispatch进行派发
cancelInstance
代码语言:javascript复制 @DesignateServer
public void cancelInstance(Long appId, Long instanceId) {
log.info("[Instance-{}] try to cancel the instance with appId {}.", instanceId, appId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);
boolean success;
// 本机时间轮中存在该任务且顺利取消,抢救成功!
if (timerFuture != null) {
success = timerFuture.cancel();
} else {
// 调用该接口时间和预计调度时间相近时,理论上会出现问题,cancel 状态还没写进去另一边就完成了 dispatch,随后状态会被覆盖
// 解决该问题的成本极高(分布式锁),因此选择不解决
// 该接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性
success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
}
if (success) {
instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
// 如果写 DB 失败,抛异常,接口返回 false,即取消失败,任务会被 HA 机制重新调度执行,因此此处不需要任何处理
instanceInfoRepository.saveAndFlush(instanceInfo);
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
} else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new PowerJobException("instance already up and running");
}
} catch (Exception e) {
log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
throw e;
}
}
cancelInstance通过InstanceTimeWheelService.fetchTimerFuture获取timerFuture,对于timerFuture不为null的直接cancel;然后更新status为CANCELED,最后保存
InstanceLogService
tech/powerjob/server/core/instance/InstanceLogService.java
代码语言:javascript复制@Slf4j
@Service
public class InstanceLogService {
@Value("${server.port}")
private int port;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private GridFsManager gridFsManager;
/**
* 本地数据库操作bean
*/
@Resource(name = "localTransactionTemplate")
private TransactionTemplate localTransactionTemplate;
@Resource
private LocalInstanceLogRepository localInstanceLogRepository;
/**
* 本地维护了在线日志的任务实例ID
*/
private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
@Resource(name = PJThreadPool.BACKGROUND_POOL)
private AsyncTaskExecutor powerJobBackgroundPool;
/**
* 分段锁
*/
private final SegmentLock segmentLock = new SegmentLock(8);
/**
* 格式化时间戳
*/
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
/**
* 每一个展示的行数
*/
private static final int MAX_LINE_COUNT = 100;
/**
* 过期时间
*/
private static final long EXPIRE_INTERVAL_MS = 60000;
//......
}
InstanceLogService提供了fetchInstanceLog、fetchDownloadUrl、downloadInstanceLog等方法
fetchInstanceLog
代码语言:javascript复制 @DesignateServer
public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
try {
Future<File> fileFuture = prepareLogFile(instanceId);
// 超时并不会打断正在执行的任务
File logFile = fileFuture.get(5, TimeUnit.SECONDS);
// 分页展示数据
long lines = 0;
StringBuilder sb = new StringBuilder();
String lineStr;
long left = index * MAX_LINE_COUNT;
long right = left MAX_LINE_COUNT;
try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
while ((lineStr = lr.readLine()) != null) {
// 指定范围内,读出
if (lines >= left && lines < right) {
sb.append(lineStr).append(System.lineSeparator());
}
lines;
}
}catch (Exception e) {
log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
return StringPage.simple("oms-server execution exception, caused by " ExceptionUtils.getRootCauseMessage(e));
}
double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
return new StringPage(index, (long) totalPage, sb.toString());
}catch (TimeoutException te) {
return StringPage.simple("log file is being prepared, please try again later.");
}catch (Exception e) {
log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
return StringPage.simple("oms-server execution exception, caused by " ExceptionUtils.getRootCauseMessage(e));
}
}
private Future<File> prepareLogFile(long instanceId) {
return powerJobBackgroundPool.submit(() -> {
// 在线日志还在不断更新,需要使用本地数据库中的数据
if (instanceId2LastReportTime.containsKey(instanceId)) {
return genTemporaryLogFile(instanceId);
}
return genStableLogFile(instanceId);
});
}
fetchInstanceLog先通过prepareLogFile准备日志文件,对于还在更新的则执行genTemporaryLogFile,否则执行genStableLogFile;本地数据库存在的则直接下载,否则判断gridFsManager是否可用,可用则从gridFsManager取
fetchDownloadUrl
代码语言:javascript复制 @DesignateServer
public String fetchDownloadUrl(Long appId, Long instanceId) {
String url = "http://" NetUtils.getLocalHost() ":" port "/instance/downloadLog?instanceId=" instanceId;
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
return url;
}
fetchDownloadUrl则返回
/instance/downloadLog
下载url
downloadInstanceLog
代码语言:javascript复制 public File downloadInstanceLog(long instanceId) throws Exception {
Future<File> fileFuture = prepareLogFile(instanceId);
return fileFuture.get(1, TimeUnit.MINUTES);
}
downloadInstanceLog则是通过prepareLogFile准备文件,然后等待1分钟
小结
InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托给了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托给了instanceLogService;list委托给了instanceInfoRepository。