在上一篇《Ambari-server源码分析:agent-AgentResource类》 http://blog.csdn.net/chengyuqiang/article/details/61914712 的基础上,再来看另一个核心类:HeartBeatHandler。
该类位于org.apache.ambari.server.agent包下,如下图。
- Ambari-server 的AgentResource类提供处理Ambari-agent请求的REST接口
- Ambari-server 的HeartBeatHander提供真正的处理服务,处理来自Ambari-agent的心跳,将信息传递到其他模块,并处理队列以发送心跳响应,将响应结果返回给AgentResource
package org.apache.ambari.server.agent;
import ...;
/**
* This class handles the heartbeats coming from the agent, passes on the information
* to other modules and processes the queue to send heartbeat response.
* 此类处理来自代理的心跳,将信息传递到其他模块,并处理队列以发送心跳响应。
*/
@Singleton
public class HeartBeatHandler {
/**
* Logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class);
private static final Pattern DOT_PATTERN = Pattern.compile("\.");
private final Clusters clusterFsm;
private final ActionQueue actionQueue;
private final ActionManager actionManager;
private HeartbeatMonitor heartbeatMonitor;
private HeartbeatProcessor heartbeatProcessor;
@Inject
private Injector injector;
@Inject
private Configuration config;
@Inject
private AmbariMetaInfo ambariMetaInfo;
@Inject
private ConfigHelper configHelper;
@Inject
private AlertDefinitionHash alertDefinitionHash;
@Inject
private RecoveryConfigHelper recoveryConfigHelper;
/**
* KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances
*/
@Inject
private KerberosIdentityDataFileReaderFactory kerberosIdentityDataFileReaderFactory;
private Map<String, Long> hostResponseIds = new ConcurrentHashMap<String, Long>();
private Map<String, HeartBeatResponse> hostResponses = new ConcurrentHashMap<String, HeartBeatResponse>();
//构造器依赖注入
@Inject
public HeartBeatHandler(Clusters fsm, ActionQueue aq, ActionManager am,
Injector injector) {
clusterFsm = fsm;
actionQueue = aq;
actionManager = am;
heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector);
heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern
injector.injectMembers(this);
}
public void start() {
//启动心跳处理器
heartbeatProcessor.startAsync();
//启动心跳监控器
heartbeatMonitor.start();
}
void setHeartbeatMonitor(HeartbeatMonitor heartbeatMonitor) {
this.heartbeatMonitor = heartbeatMonitor;
}
public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) {
this.heartbeatProcessor = heartbeatProcessor;
}
public HeartbeatProcessor getHeartbeatProcessor() {
return heartbeatProcessor;
}
//处理心跳
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
long now = System.currentTimeMillis();
if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
//报告处理心跳的开始时间
heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
}
String hostname = heartbeat.getHostname();
Long currentResponseId = hostResponseIds.get(hostname);
HeartBeatResponse response;
//服务器重新启动或未知主机
if (currentResponseId == null) {
//Server restarted, or unknown host.
LOG.error("CurrentResponseId unknown for " hostname " - send register command");
// 无responseId, 新请求,就进行注册, responseId =0
return createRegisterCommand();
}
LOG.debug("Received heartbeat from host"
", hostname=" hostname
", currentResponseId=" currentResponseId
", receivedResponseId=" heartbeat.getResponseId());
//接收到旧响应ID - 响应丢失 - 返回缓存响应
if (heartbeat.getResponseId() == currentResponseId - 1) {
LOG.warn("Old responseId received - response was lost - returning cached response");
return hostResponses.get(hostname);
} else if (heartbeat.getResponseId() != currentResponseId) {
LOG.error("Error in responseId sequence - sending agent restart command");
// 心跳是历史记录,那么就要求其重启,重新注册,responseId 不变
return createRestartCommand(currentResponseId);
}
response = new HeartBeatResponse();
//responseId 加 1 , 返回一个新的responseId,下次心跳又要把这个responseId带回来。
response.setResponseId( currentResponseId);
Host hostObject;
try {
hostObject = clusterFsm.getHost(hostname);
} catch (HostNotFoundException e) {
LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Host associated with the agent heratbeat might have been " "deleted", e);
}
// For now return empty response with only response id.
return response;
}
//失去心跳,要求重新注册, responseId=0
if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
// After loosing heartbeat agent should reregister
LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
return createRegisterCommand();
}
hostResponseIds.put(hostname, currentResponseId);
hostResponses.put(hostname, response);
// If the host is waiting for component status updates, notify it
//如果主机正在等待组件状态更新,请通知它
//节点已经进行了注册,但是该节点还没有汇报相关状态信息,等待服务状态更新
if (heartbeat.componentStatus.size() > 0
&& hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
try {
LOG.debug("Got component status updates");
//更新服务状态机
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
} catch (InvalidStateTransitionException e) {
LOG.warn("Failed to notify the host about component status updates", e);
}
}
if (heartbeat.getRecoveryReport() != null) {
RecoveryReport rr = heartbeat.getRecoveryReport();
processRecoveryReport(rr, hostname);
}
try {
if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
//向状态机发送更新事件,更新节点至正常状态
hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
heartbeat.getAgentEnv(), heartbeat.getMounts()));
} else { // 把节点列入不健康
hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
}
} catch (InvalidStateTransitionException ex) {
LOG.warn("Asking agent to re-register due to " ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
return createRegisterCommand();
}
/**
* A host can belong to only one cluster. Though getClustersForHost(hostname)
* returns a set of clusters, it will have only one entry.
*主机只能属于一个集群。 通过getClustersForHost(hostname)返回一组集群,它只有一个条目。
*
* TODO: Handle the case when a host is a part of multiple clusters.
* 处理 主机是多个集群的一部分时的 情况。
*/
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
if (clusters.size() > 0) {
String clusterName = clusters.iterator().next().getClusterName();
if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
response.setRecoveryConfig(rc);
if (response.getRecoveryConfig() != null) {
LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
}
}
}
heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
if (hostObject.getState().equals(HostState.HEALTHY)) {
sendCommands(hostname, response);
annotateResponse(hostname, response);
}
return response;
}
protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
LOG.debug("Received recovery report: " recoveryReport.toString());
Host host = clusterFsm.getHost(hostname);
host.setRecoveryReport(recoveryReport);
}
/**
* Adds commands from action queue to a heartbeat response.
* 将操作队列中的命令添加到心跳响应。
*/
protected void sendCommands(String hostname, HeartBeatResponse response)
throws AmbariException {
List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
if (cmds != null && !cmds.isEmpty()) {
for (AgentCommand ac : cmds) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending command string = " StageUtils.jaxbToString(ac));
}
} catch (Exception e) {
throw new AmbariException("Could not get jaxb string for command", e);
}
switch (ac.getCommandType()) {//根据命令类型处理
//背景执行命令
case BACKGROUND_EXECUTION_COMMAND:
//执行命令
case EXECUTION_COMMAND: {
//将AgentCommand强制转换为ExecutionCommand
ExecutionCommand ec = (ExecutionCommand) ac;
LOG.info("HeartBeatHandler.sendCommands: sending ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task ID {}",
ec.getHostname(), ec.getRole(), ec.getRoleCommand(), ec.getCommandId(), ec.getTaskId());
Map<String, String> hlp = ec.getHostLevelParams();
if (hlp != null) {
String customCommand = hlp.get("custom_command");
if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
LOG.info(String.format("%s called", customCommand));
try {
injectKeytab(ec, customCommand, hostname);
} catch (IOException e) {
throw new AmbariException("Could not inject keytab into command", e);
}
}
}
response.addExecutionCommand((ExecutionCommand) ac);
break;
}
case STATUS_COMMAND: {
response.addStatusCommand((StatusCommand) ac);
break;
}
case CANCEL_COMMAND: {
response.addCancelCommand((CancelCommand) ac);
break;
}
case ALERT_DEFINITION_COMMAND: {
response.addAlertDefinitionCommand((AlertDefinitionCommand) ac);
break;
}
case ALERT_EXECUTION_COMMAND: {
response.addAlertExecutionCommand((AlertExecutionCommand) ac);
break;
}
default:
LOG.error("There is no action for agent command ="
ac.getCommandType().name());
}
}
}
}
public String getOsType(String os, String osRelease) {
String osType = "";
if (os != null) {
osType = os;
}
if (osRelease != null) {
String[] release = DOT_PATTERN.split(osRelease);
if (release.length > 0) {
osType = release[0];
}
}
return osType.toLowerCase();
}
protected HeartBeatResponse createRegisterCommand() {
HeartBeatResponse response = new HeartBeatResponse();
RegistrationCommand regCmd = new RegistrationCommand();
response.setResponseId(0);
response.setRegistrationCommand(regCmd);
return response;
}
protected HeartBeatResponse createRestartCommand(Long currentResponseId) {
HeartBeatResponse response = new HeartBeatResponse();
response.setRestartAgent(true);
response.setResponseId(currentResponseId);
return response;
}
//注册响应
public RegistrationResponse handleRegistration(Register register)
throws InvalidStateTransitionException, AmbariException {
String hostname = register.getHostname();
int currentPingPort = register.getCurrentPingPort();
long now = System.currentTimeMillis();
String agentVersion = register.getAgentVersion();
String serverVersion = ambariMetaInfo.getServerVersion();
if (!VersionUtils.areVersionsEqual(serverVersion, agentVersion, true)) {
LOG.warn("Received registration request from host with non compatible"
" agent version"
", hostname=" hostname
", agentVersion=" agentVersion
", serverVersion=" serverVersion);
throw new AmbariException("Cannot register host with non compatible"
" agent version"
", hostname=" hostname
", agentVersion=" agentVersion
", serverVersion=" serverVersion);
}
String agentOsType = getOsType(register.getHardwareProfile().getOS(),
register.getHardwareProfile().getOSRelease());
LOG.info("agentOsType = " agentOsType);
if (!ambariMetaInfo.isOsSupported(agentOsType)) {
LOG.warn("Received registration request from host with not supported"
" os type"
", hostname=" hostname
", serverOsType=" config.getServerOsType()
", agentOsType=" agentOsType);
throw new AmbariException("Cannot register host with not supported"
" os type"
", hostname=" hostname
", serverOsType=" config.getServerOsType()
", agentOsType=" agentOsType);
}
Host hostObject;
try {
hostObject = clusterFsm.getHost(hostname);
} catch (HostNotFoundException ex) {
clusterFsm.addHost(hostname);
hostObject = clusterFsm.getHost(hostname);
}
// Resetting host state
hostObject.setState(HostState.INIT);
// Set ping port for agent
hostObject.setCurrentPingPort(currentPingPort);
// Get status of service components
List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname);
// Add request for component version
for (StatusCommand command : cmds) {
command.getCommandParams().put("request_version", String.valueOf(true));
}
// Save the prefix of the log file paths
hostObject.setPrefix(register.getPrefix());
hostObject.handleEvent(new HostRegistrationRequestEvent(hostname,
null != register.getPublicHostname() ? register.getPublicHostname() : hostname,
new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(),
register.getAgentEnv()));
RegistrationResponse response = new RegistrationResponse();
if (cmds.isEmpty()) {
//No status commands needed let the fsm know that status step is done
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname,
now));
}
response.setStatusCommands(cmds);
response.setResponseStatus(RegistrationStatus.OK);
// force the registering agent host to receive its list of alert definitions
List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
response.setAlertDefinitionCommands(alertDefinitionCommands);
response.setAgentConfig(config.getAgentConfigsMap());
if (response.getAgentConfig() != null) {
LOG.debug("Agent configuration map set to " response.getAgentConfig());
}
/**
* A host can belong to only one cluster. Though getClustersForHost(hostname)
* returns a set of clusters, it will have only one entry.
*
* TODO: Handle the case when a host is a part of multiple clusters.
*/
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
if (clusters.size() > 0) {
String clusterName = clusters.iterator().next().getClusterName();
RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
response.setRecoveryConfig(rc);
if (response.getRecoveryConfig() != null) {
LOG.info("Recovery configuration set to " response.getRecoveryConfig().toString());
}
}
Long requestId = 0L;
hostResponseIds.put(hostname, requestId);
response.setResponseId(requestId);
return response;
}
/**
* Annotate the response with some housekeeping details.
* hasMappedComponents - indicates if any components are mapped to the host
* hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet)
* clusterSize - indicates the number of hosts that form the cluster
* @param hostname
* @param response
* @throws org.apache.ambari.server.AmbariException
*/
private void annotateResponse(String hostname, HeartBeatResponse response) throws AmbariException {
for (Cluster cl : clusterFsm.getClustersForHost(hostname)) {
response.setClusterSize(cl.getClusterSize());
List<ServiceComponentHost> scHosts = cl.getServiceComponentHosts(hostname);
if (scHosts != null && scHosts.size() > 0) {
response.setHasMappedComponents(true);
break;
}
}
if (actionQueue.hasPendingTask(hostname)) {
LOG.debug("Host " hostname " has pending tasks");
response.setHasPendingTasks(true);
}
}
/**
* Response contains information about HDP Stack in use
* @param clusterName
* @return @ComponentsResponse
* @throws org.apache.ambari.server.AmbariException
*/
public ComponentsResponse handleComponents(String clusterName)
throws AmbariException {
ComponentsResponse response = new ComponentsResponse();
Cluster cluster = clusterFsm.getCluster(clusterName);
StackId stackId = cluster.getCurrentStackVersion();
if (stackId == null) {
throw new AmbariException("Cannot provide stack components map. "
"Stack hasn't been selected yet.");
}
StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
stackId.getStackVersion());
response.setClusterName(clusterName);
response.setStackName(stackId.getStackName());
response.setStackVersion(stackId.getStackVersion());
response.setComponents(getComponentsMap(stack));
return response;
}
private Map<String, Map<String, String>> getComponentsMap(StackInfo stack) {
Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
for (ServiceInfo service : stack.getServices()) {
Map<String, String> components = new HashMap<String, String>();
for (ComponentInfo component : service.getComponents()) {
components.put(component.getName(), component.getCategory());
}
result.put(service.getName(), components);
}
return result;
}
/**
* Gets the {@link AlertDefinitionCommand} instances that need to be sent for
* each cluster that the registering host is a member of.
*
* @param hostname
* @return
* @throws AmbariException
*/
private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands(
String hostname) throws AmbariException {
Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname);
if (null == hostClusters || hostClusters.size() == 0) {
return null;
}
List<AlertDefinitionCommand> commands = new ArrayList<AlertDefinitionCommand>();
// for every cluster this host is a member of, build the command
for (Cluster cluster : hostClusters) {
String clusterName = cluster.getClusterName();
alertDefinitionHash.invalidate(clusterName, hostname);
List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
clusterName, hostname);
String hash = alertDefinitionHash.getHash(clusterName, hostname);
AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
hostname, hash, definitions);
command.addConfigs(configHelper, cluster);
commands.add(command);
}
return commands;
}
/**
* Insert Kerberos keytab details into the ExecutionCommand for the SET_KEYTAB custom command if
* any keytab details and associated data exists for the target host.
*
* @param ec the ExecutionCommand to update
* @param command a name of the relevant keytab command
* @param targetHost a name of the host the relevant command is destined for
* @throws AmbariException
*/
void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException {
String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY);
if (dataDir != null) {
KerberosIdentityDataFileReader reader = null;
List<Map<String, String>> kcp = ec.getKerberosCommandParams();
try {
reader = kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME));
for (Map<String, String> record : reader) {
String hostName = record.get(KerberosIdentityDataFileReader.HOSTNAME);
if (targetHost.equalsIgnoreCase(hostName)) {
if ("SET_KEYTAB".equalsIgnoreCase(command)) {
String keytabFilePath = record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
if (keytabFilePath != null) {
String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath);
File keytabFile = new File(dataDir File.separator hostName File.separator sha1Keytab);
if (keytabFile.canRead()) {
Map<String, String> keytabMap = new HashMap<String, String>();
String principal = record.get(KerberosIdentityDataFileReader.PRINCIPAL);
String isService = record.get(KerberosIdentityDataFileReader.SERVICE);
keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
keytabMap.put(KerberosIdentityDataFileReader.SERVICE, isService);
keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT));
keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal);
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME));
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS));
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME));
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS));
BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile));
byte[] keytabContent = null;
try {
keytabContent = IOUtils.toByteArray(bufferedIn);
} finally {
bufferedIn.close();
}
String keytabContentBase64 = Base64.encodeBase64String(keytabContent);
keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64);
kcp.add(keytabMap);
}
}
} else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) {
Map<String, String> keytabMap = new HashMap<String, String>();
keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
keytabMap.put(KerberosIdentityDataFileReader.SERVICE, record.get(KerberosIdentityDataFileReader.SERVICE));
keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT));
keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, record.get(KerberosIdentityDataFileReader.PRINCIPAL));
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH));
kcp.add(keytabMap);
}
}
}
} catch (IOException e) {
throw new AmbariException("Could not inject keytabs to enable kerberos");
} finally {
if (reader != null) {
try {
reader.close();
} catch (Throwable t) {
// ignored
}
}
}
ec.setKerberosCommandParams(kcp);
}
}
}