Ambari-server源码分析:核心类-心跳处理agent.HeartBeatHandler

2022-05-06 17:46:29 浏览数 (1)

在上一篇《Ambari-server源码分析:agent-AgentResource类》 http://blog.csdn.net/chengyuqiang/article/details/61914712 的基础上,再来看另一个核心类:HeartBeatHandler

该类位于org.apache.ambari.server.agent包下,如下图。

  • Ambari-server 的AgentResource类提供处理Ambari-agent请求的REST接口
  • Ambari-server 的HeartBeatHander提供真正的处理服务,处理来自Ambari-agent的心跳,将信息传递到其他模块,并处理队列以发送心跳响应,将响应结果返回给AgentResource
代码语言:javascript复制
package org.apache.ambari.server.agent;

import ...;

/**
 * This class handles the heartbeats coming from the agent, passes on the information
 * to other modules and processes the queue to send heartbeat response.
 * 此类处理来自代理的心跳,将信息传递到其他模块,并处理队列以发送心跳响应。
 */
@Singleton
public class HeartBeatHandler {
    /**
     * Logger.
     */
    private static final Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class);

    private static final Pattern DOT_PATTERN = Pattern.compile("\.");
    private final Clusters clusterFsm;
    private final ActionQueue actionQueue;
    private final ActionManager actionManager;
    private HeartbeatMonitor heartbeatMonitor;
    private HeartbeatProcessor heartbeatProcessor;

    @Inject
    private Injector injector;

    @Inject
    private Configuration config;

    @Inject
    private AmbariMetaInfo ambariMetaInfo;

    @Inject
    private ConfigHelper configHelper;

    @Inject
    private AlertDefinitionHash alertDefinitionHash;

    @Inject
    private RecoveryConfigHelper recoveryConfigHelper;

    /**
     * KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances
     */
    @Inject
    private KerberosIdentityDataFileReaderFactory kerberosIdentityDataFileReaderFactory;

    private Map<String, Long> hostResponseIds = new ConcurrentHashMap<String, Long>();

    private Map<String, HeartBeatResponse> hostResponses = new ConcurrentHashMap<String, HeartBeatResponse>();

    //构造器依赖注入
    @Inject
    public HeartBeatHandler(Clusters fsm, ActionQueue aq, ActionManager am,
                            Injector injector) {
        clusterFsm = fsm;
        actionQueue = aq;
        actionManager = am;
        heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector);
        heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern
        injector.injectMembers(this);
    }

    public void start() {
        //启动心跳处理器
        heartbeatProcessor.startAsync();
        //启动心跳监控器
        heartbeatMonitor.start();
    }

    void setHeartbeatMonitor(HeartbeatMonitor heartbeatMonitor) {
        this.heartbeatMonitor = heartbeatMonitor;
    }

    public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) {
        this.heartbeatProcessor = heartbeatProcessor;
    }

    public HeartbeatProcessor getHeartbeatProcessor() {
        return heartbeatProcessor;
    }

    //处理心跳
    public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
            throws AmbariException {
        long now = System.currentTimeMillis();
        if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
            //报告处理心跳的开始时间
            heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
        }

        String hostname = heartbeat.getHostname();
        Long currentResponseId = hostResponseIds.get(hostname);
        HeartBeatResponse response;

        //服务器重新启动或未知主机
        if (currentResponseId == null) {
            //Server restarted, or unknown host.
            LOG.error("CurrentResponseId unknown for "   hostname   " - send register command");
            // 无responseId, 新请求,就进行注册, responseId =0
            return createRegisterCommand();
        }

        LOG.debug("Received heartbeat from host"
                  ", hostname="   hostname
                  ", currentResponseId="   currentResponseId
                  ", receivedResponseId="   heartbeat.getResponseId());

        //接收到旧响应ID - 响应丢失 - 返回缓存响应
        if (heartbeat.getResponseId() == currentResponseId - 1) {
            LOG.warn("Old responseId received - response was lost - returning cached response");
            return hostResponses.get(hostname);
        } else if (heartbeat.getResponseId() != currentResponseId) {
            LOG.error("Error in responseId sequence - sending agent restart command");
            // 心跳是历史记录,那么就要求其重启,重新注册,responseId 不变
            return createRestartCommand(currentResponseId);
        }

        response = new HeartBeatResponse();
        //responseId 加 1 , 返回一个新的responseId,下次心跳又要把这个responseId带回来。
        response.setResponseId(  currentResponseId);

        Host hostObject;
        try {
            hostObject = clusterFsm.getHost(hostname);
        } catch (HostNotFoundException e) {
            LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Host associated with the agent heratbeat might have been "   "deleted", e);
            }
            // For now return empty response with only response id.
            return response;
        }
        //失去心跳,要求重新注册, responseId=0
        if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
            // After loosing heartbeat agent should reregister
            LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
            return createRegisterCommand();
        }

        hostResponseIds.put(hostname, currentResponseId);
        hostResponses.put(hostname, response);

        // If the host is waiting for component status updates, notify it
        //如果主机正在等待组件状态更新,请通知它
        //节点已经进行了注册,但是该节点还没有汇报相关状态信息,等待服务状态更新
        if (heartbeat.componentStatus.size() > 0
                && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
            try {
                LOG.debug("Got component status updates");
                //更新服务状态机
                hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
            } catch (InvalidStateTransitionException e) {
                LOG.warn("Failed to notify the host about component status updates", e);
            }
        }

        if (heartbeat.getRecoveryReport() != null) {
            RecoveryReport rr = heartbeat.getRecoveryReport();
            processRecoveryReport(rr, hostname);
        }

        try {
            if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
                //向状态机发送更新事件,更新节点至正常状态
                hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
                        heartbeat.getAgentEnv(), heartbeat.getMounts()));
            } else { // 把节点列入不健康
                hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
            }
        } catch (InvalidStateTransitionException ex) {
            LOG.warn("Asking agent to re-register due to "   ex.getMessage(), ex);
            hostObject.setState(HostState.INIT);
            return createRegisterCommand();
        }

        /**
         * A host can belong to only one cluster. Though getClustersForHost(hostname)
         * returns a set of clusters, it will have only one entry.
         *主机只能属于一个集群。 通过getClustersForHost(hostname)返回一组集群,它只有一个条目。
         *
         * TODO: Handle the case when a host is a part of multiple clusters.
         * 处理 主机是多个集群的一部分时的 情况。
         */
        Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);

        if (clusters.size() > 0) {
            String clusterName = clusters.iterator().next().getClusterName();

            if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
                RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
                response.setRecoveryConfig(rc);

                if (response.getRecoveryConfig() != null) {
                    LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
                }
            }
        }

        heartbeatProcessor.addHeartbeat(heartbeat);

        // Send commands if node is active
        if (hostObject.getState().equals(HostState.HEALTHY)) {
            sendCommands(hostname, response);
            annotateResponse(hostname, response);
        }

        return response;
    }


    protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
        LOG.debug("Received recovery report: "   recoveryReport.toString());
        Host host = clusterFsm.getHost(hostname);
        host.setRecoveryReport(recoveryReport);
    }

    /**
     * Adds commands from action queue to a heartbeat response.
     * 将操作队列中的命令添加到心跳响应。
     */
    protected void sendCommands(String hostname, HeartBeatResponse response)
            throws AmbariException {
        List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
        if (cmds != null && !cmds.isEmpty()) {
            for (AgentCommand ac : cmds) {
                try {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sending command string = "   StageUtils.jaxbToString(ac));
                    }
                } catch (Exception e) {
                    throw new AmbariException("Could not get jaxb string for command", e);
                }
                switch (ac.getCommandType()) {//根据命令类型处理
                    //背景执行命令
                    case BACKGROUND_EXECUTION_COMMAND:
                    //执行命令
                    case EXECUTION_COMMAND: {
                        //将AgentCommand强制转换为ExecutionCommand
                        ExecutionCommand ec = (ExecutionCommand) ac;
                        LOG.info("HeartBeatHandler.sendCommands: sending ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task ID {}",
                                ec.getHostname(), ec.getRole(), ec.getRoleCommand(), ec.getCommandId(), ec.getTaskId());
                        Map<String, String> hlp = ec.getHostLevelParams();
                        if (hlp != null) {
                            String customCommand = hlp.get("custom_command");
                            if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
                                LOG.info(String.format("%s called", customCommand));
                                try {
                                    injectKeytab(ec, customCommand, hostname);
                                } catch (IOException e) {
                                    throw new AmbariException("Could not inject keytab into command", e);
                                }
                            }
                        }
                        response.addExecutionCommand((ExecutionCommand) ac);
                        break;
                    }
                    case STATUS_COMMAND: {
                        response.addStatusCommand((StatusCommand) ac);
                        break;
                    }
                    case CANCEL_COMMAND: {
                        response.addCancelCommand((CancelCommand) ac);
                        break;
                    }
                    case ALERT_DEFINITION_COMMAND: {
                        response.addAlertDefinitionCommand((AlertDefinitionCommand) ac);
                        break;
                    }
                    case ALERT_EXECUTION_COMMAND: {
                        response.addAlertExecutionCommand((AlertExecutionCommand) ac);
                        break;
                    }
                    default:
                        LOG.error("There is no action for agent command ="
                                  ac.getCommandType().name());
                }
            }
        }
    }

    public String getOsType(String os, String osRelease) {
        String osType = "";
        if (os != null) {
            osType = os;
        }
        if (osRelease != null) {
            String[] release = DOT_PATTERN.split(osRelease);
            if (release.length > 0) {
                osType  = release[0];
            }
        }
        return osType.toLowerCase();
    }

    protected HeartBeatResponse createRegisterCommand() {
        HeartBeatResponse response = new HeartBeatResponse();
        RegistrationCommand regCmd = new RegistrationCommand();
        response.setResponseId(0);
        response.setRegistrationCommand(regCmd);
        return response;
    }

    protected HeartBeatResponse createRestartCommand(Long currentResponseId) {
        HeartBeatResponse response = new HeartBeatResponse();
        response.setRestartAgent(true);
        response.setResponseId(currentResponseId);
        return response;
    }

    //注册响应
    public RegistrationResponse handleRegistration(Register register)
            throws InvalidStateTransitionException, AmbariException {
        String hostname = register.getHostname();
        int currentPingPort = register.getCurrentPingPort();
        long now = System.currentTimeMillis();

        String agentVersion = register.getAgentVersion();
        String serverVersion = ambariMetaInfo.getServerVersion();
        if (!VersionUtils.areVersionsEqual(serverVersion, agentVersion, true)) {
            LOG.warn("Received registration request from host with non compatible"
                      " agent version"
                      ", hostname="   hostname
                      ", agentVersion="   agentVersion
                      ", serverVersion="   serverVersion);
            throw new AmbariException("Cannot register host with non compatible"
                      " agent version"
                      ", hostname="   hostname
                      ", agentVersion="   agentVersion
                      ", serverVersion="   serverVersion);
        }

        String agentOsType = getOsType(register.getHardwareProfile().getOS(),
                register.getHardwareProfile().getOSRelease());
        LOG.info("agentOsType = "   agentOsType);
        if (!ambariMetaInfo.isOsSupported(agentOsType)) {
            LOG.warn("Received registration request from host with not supported"
                      " os type"
                      ", hostname="   hostname
                      ", serverOsType="   config.getServerOsType()
                      ", agentOsType="   agentOsType);
            throw new AmbariException("Cannot register host with not supported"
                      " os type"
                      ", hostname="   hostname
                      ", serverOsType="   config.getServerOsType()
                      ", agentOsType="   agentOsType);
        }

        Host hostObject;
        try {
            hostObject = clusterFsm.getHost(hostname);
        } catch (HostNotFoundException ex) {
            clusterFsm.addHost(hostname);
            hostObject = clusterFsm.getHost(hostname);
        }

        // Resetting host state
        hostObject.setState(HostState.INIT);

        // Set ping port for agent
        hostObject.setCurrentPingPort(currentPingPort);

        // Get status of service components
        List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname);

        // Add request for component version
        for (StatusCommand command : cmds) {
            command.getCommandParams().put("request_version", String.valueOf(true));
        }

        // Save the prefix of the log file paths
        hostObject.setPrefix(register.getPrefix());

        hostObject.handleEvent(new HostRegistrationRequestEvent(hostname,
                null != register.getPublicHostname() ? register.getPublicHostname() : hostname,
                new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(),
                register.getAgentEnv()));

        RegistrationResponse response = new RegistrationResponse();
        if (cmds.isEmpty()) {
            //No status commands needed let the fsm know that status step is done
            hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname,
                    now));
        }

        response.setStatusCommands(cmds);

        response.setResponseStatus(RegistrationStatus.OK);

        // force the registering agent host to receive its list of alert definitions
        List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
        response.setAlertDefinitionCommands(alertDefinitionCommands);

        response.setAgentConfig(config.getAgentConfigsMap());
        if (response.getAgentConfig() != null) {
            LOG.debug("Agent configuration map set to "   response.getAgentConfig());
        }

        /**
         * A host can belong to only one cluster. Though getClustersForHost(hostname)
         * returns a set of clusters, it will have only one entry.
         *
         * TODO: Handle the case when a host is a part of multiple clusters.
         */
        Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);

        if (clusters.size() > 0) {
            String clusterName = clusters.iterator().next().getClusterName();

            RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
            response.setRecoveryConfig(rc);

            if (response.getRecoveryConfig() != null) {
                LOG.info("Recovery configuration set to "   response.getRecoveryConfig().toString());
            }
        }

        Long requestId = 0L;
        hostResponseIds.put(hostname, requestId);
        response.setResponseId(requestId);
        return response;
    }

    /**
     * Annotate the response with some housekeeping details.
     * hasMappedComponents - indicates if any components are mapped to the host
     * hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet)
     * clusterSize - indicates the number of hosts that form the cluster
     * @param hostname
     * @param response
     * @throws org.apache.ambari.server.AmbariException
     */
    private void annotateResponse(String hostname, HeartBeatResponse response) throws AmbariException {
        for (Cluster cl : clusterFsm.getClustersForHost(hostname)) {
            response.setClusterSize(cl.getClusterSize());

            List<ServiceComponentHost> scHosts = cl.getServiceComponentHosts(hostname);
            if (scHosts != null && scHosts.size() > 0) {
                response.setHasMappedComponents(true);
                break;
            }
        }

        if (actionQueue.hasPendingTask(hostname)) {
            LOG.debug("Host "   hostname   " has pending tasks");
            response.setHasPendingTasks(true);
        }
    }

    /**
     * Response contains information about HDP Stack in use
     * @param clusterName
     * @return @ComponentsResponse
     * @throws org.apache.ambari.server.AmbariException
     */
    public ComponentsResponse handleComponents(String clusterName)
            throws AmbariException {
        ComponentsResponse response = new ComponentsResponse();

        Cluster cluster = clusterFsm.getCluster(clusterName);
        StackId stackId = cluster.getCurrentStackVersion();
        if (stackId == null) {
            throw new AmbariException("Cannot provide stack components map. "  
                    "Stack hasn't been selected yet.");
        }
        StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
                stackId.getStackVersion());

        response.setClusterName(clusterName);
        response.setStackName(stackId.getStackName());
        response.setStackVersion(stackId.getStackVersion());
        response.setComponents(getComponentsMap(stack));

        return response;
    }

    private Map<String, Map<String, String>> getComponentsMap(StackInfo stack) {
        Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();

        for (ServiceInfo service : stack.getServices()) {
            Map<String, String> components = new HashMap<String, String>();

            for (ComponentInfo component : service.getComponents()) {
                components.put(component.getName(), component.getCategory());
            }

            result.put(service.getName(), components);
        }

        return result;
    }

    /**
     * Gets the {@link AlertDefinitionCommand} instances that need to be sent for
     * each cluster that the registering host is a member of.
     *
     * @param hostname
     * @return
     * @throws AmbariException
     */
    private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands(
            String hostname) throws AmbariException {

        Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname);
        if (null == hostClusters || hostClusters.size() == 0) {
            return null;
        }

        List<AlertDefinitionCommand> commands = new ArrayList<AlertDefinitionCommand>();

        // for every cluster this host is a member of, build the command
        for (Cluster cluster : hostClusters) {
            String clusterName = cluster.getClusterName();
            alertDefinitionHash.invalidate(clusterName, hostname);

            List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
                    clusterName, hostname);

            String hash = alertDefinitionHash.getHash(clusterName, hostname);
            AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
                    hostname, hash, definitions);

            command.addConfigs(configHelper, cluster);
            commands.add(command);
        }

        return commands;
    }

    /**
     * Insert Kerberos keytab details into the ExecutionCommand for the SET_KEYTAB custom command if
     * any keytab details and associated data exists for the target host.
     *
     * @param ec the ExecutionCommand to update
     * @param command a name of the relevant keytab command
     * @param targetHost a name of the host the relevant command is destined for
     * @throws AmbariException
     */
    void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException {
        String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY);

        if (dataDir != null) {
            KerberosIdentityDataFileReader reader = null;
            List<Map<String, String>> kcp = ec.getKerberosCommandParams();

            try {
                reader = kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME));

                for (Map<String, String> record : reader) {
                    String hostName = record.get(KerberosIdentityDataFileReader.HOSTNAME);

                    if (targetHost.equalsIgnoreCase(hostName)) {

                        if ("SET_KEYTAB".equalsIgnoreCase(command)) {
                            String keytabFilePath = record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);

                            if (keytabFilePath != null) {

                                String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath);
                                File keytabFile = new File(dataDir   File.separator   hostName   File.separator   sha1Keytab);

                                if (keytabFile.canRead()) {
                                    Map<String, String> keytabMap = new HashMap<String, String>();
                                    String principal = record.get(KerberosIdentityDataFileReader.PRINCIPAL);
                                    String isService = record.get(KerberosIdentityDataFileReader.SERVICE);

                                    keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
                                    keytabMap.put(KerberosIdentityDataFileReader.SERVICE, isService);
                                    keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT));
                                    keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal);
                                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
                                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME));
                                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS));
                                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME));
                                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS));

                                    BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile));
                                    byte[] keytabContent = null;
                                    try {
                                        keytabContent = IOUtils.toByteArray(bufferedIn);
                                    } finally {
                                        bufferedIn.close();
                                    }
                                    String keytabContentBase64 = Base64.encodeBase64String(keytabContent);
                                    keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64);

                                    kcp.add(keytabMap);
                                }
                            }
                        } else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) {
                            Map<String, String> keytabMap = new HashMap<String, String>();

                            keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
                            keytabMap.put(KerberosIdentityDataFileReader.SERVICE, record.get(KerberosIdentityDataFileReader.SERVICE));
                            keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT));
                            keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, record.get(KerberosIdentityDataFileReader.PRINCIPAL));
                            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH));

                            kcp.add(keytabMap);
                        }
                    }
                }
            } catch (IOException e) {
                throw new AmbariException("Could not inject keytabs to enable kerberos");
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (Throwable t) {
                        // ignored
                    }
                }
            }

            ec.setKerberosCommandParams(kcp);
        }
    }

}

0 人点赞