序
本文主要研究一下PowerJob的ServerDiscoveryService
ServerDiscoveryService
tech/powerjob/worker/background/ServerDiscoveryService.java
代码语言:javascript复制@Slf4j
public class ServerDiscoveryService {
private final Long appId;
private final PowerJobWorkerConfig config;
private String currentServerAddress;
private final Map<String, String> ip2Address = Maps.newHashMap();
/**
* 服务发现地址
*/
private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
/**
* 失败次数
*/
private static int FAILED_COUNT = 0;
/**
* 最大失败次数
*/
private static final int MAX_FAILED_COUNT = 3;
public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
this.appId = appId;
this.config = config;
}
//......
}
ServerDiscoveryService定义了currentServerAddress、ip2Address、服务发现url模版,失败次数,最大失败次数
start
代码语言:javascript复制 public void start(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
}
// 这里必须保证成功
timingPool.scheduleAtFixedRate(() -> {
try {
this.currentServerAddress = discovery();
} catch (Exception e) {
log.error("[PowerDiscovery] fail to discovery server!", e);
}
}
, 10, 10, TimeUnit.SECONDS);
}
其start方法先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress
discovery
代码语言:javascript复制 private String discovery() {
if (ip2Address.isEmpty()) {
config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
}
String result = null;
// 先对当前机器发起请求
String currentServer = currentServerAddress;
if (!StringUtils.isEmpty(currentServer)) {
String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
String firstServerAddress = ip2Address.get(ip);
if (firstServerAddress != null) {
result = acquire(firstServerAddress);
}
}
for (String httpServerAddress : config.getServerAddress()) {
if (StringUtils.isEmpty(result)) {
result = acquire(httpServerAddress);
}else {
break;
}
}
if (StringUtils.isEmpty(result)) {
log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
// 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
if (FAILED_COUNT > MAX_FAILED_COUNT) {
log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
frequentInstanceIds.forEach(instanceId -> {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
taskTracker.destroy();
log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
});
}
FAILED_COUNT = 0;
}
return null;
} else {
// 重置失败次数
FAILED_COUNT = 0;
log.debug("[PowerDiscovery] current server is {}.", result);
return result;
}
}
discovery方法从config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值则acquire,否则遍历config.getServerAddress()执行acquire;若还没有获取到则判断FAILED_COUNT是否超出MAX_FAILED_COUNT,超出则遍历HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨个执行remove及destory
acquire
代码语言:javascript复制 private String acquire(String httpServerAddress) {
String result = null;
String url = buildServerDiscoveryUrl(httpServerAddress);
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
}catch (Exception ignore) {
}
}
return null;
}
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());
String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
return String.format(DISCOVERY_URL, address, query);
}
acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取地址
ServerController
tech/powerjob/server/web/controller/ServerController.java
代码语言:javascript复制@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {
private ServerInfo serverInfo;
private final TransportService transportService;
private final ServerElectionService serverElectionService;
private final AppInfoRepository appInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
//......
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
return ResultDTO.success(serverElectionService.elect(request));
}
//......
}
acquireServer方法执行serverElectionService.elect(request)返回server地址
elect
tech/powerjob/server/remote/server/election/ServerElectionService.java
代码语言:javascript复制 public String elect(ServerDiscoveryRequest request) {
if (!accurate()) {
final String currentServer = request.getCurrentServer();
// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
return currentServer;
}
}
return getServer0(request);
}
elect方法判断如果是本机就直接返回,否则执行getServer0
getServer0
代码语言:javascript复制 private String getServer0(ServerDiscoveryRequest discoveryRequest) {
final Long appId = discoveryRequest.getAppId();
final String protocol = discoveryRequest.getProtocol();
Set<String> downServerCache = Sets.newHashSet();
for (int i = 0; i < RETRY_TIMES; i ) {
// 无锁获取当前数据库中的Server
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new PowerJobException(appId " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
String activeAddress = activeAddress(originServer, downServerCache, protocol);
if (StringUtils.isNotEmpty(activeAddress)) {
return activeAddress;
}
// 无可用Server,重新进行Server选举,需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.tryLock(lockName, 30000);
if (!lockStatus) {
try {
Thread.sleep(500);
}catch (Exception ignore) {
}
continue;
}
try {
// 可能上一台机器已经完成了Server选举,需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
if (StringUtils.isNotEmpty(address)) {
return address;
}
// 篡位,如果本机存在协议,则作为Server调度该 worker
final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
if (targetProtocolInfo != null) {
// 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return targetProtocolInfo.getAddress();
}
}catch (Exception e) {
log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
} finally {
lockService.unlock(lockName);
}
}
throw new PowerJobException("server elect failed for app " appId);
}
getServer0方法先判断appInfoRepository中当前appId的originServer是否存活,是则直接返回,否则加锁将transportService.defaultProtocol().getAddress()写入到appInfo的currentServer
小结
PowerJob的ServerDiscoveryService定义了start方法,它先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress;discovery方法主要是遍历config.getServerAddress()执行acquire;acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取该appId的server地址。