分布式系统模式11-HeartBeat

2021-01-05 15:06:24 浏览数 (1)

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

通过定期向所有其他服务器发送消息表明服务器可用

问题

当多个服务器组成一个集群时,服务器负责根据所使用的分区和复制方案存储部分数据。及时检测服务器故障,对于确保让其他服务器负责处理故障服务器上的数据请求,以采取纠正措施非常重要。

解决方案

定期向所有其他服务器发送请求,以表明发送服务器的活跃状态。选择的请求间隔要大于服务器之间的网络往返时间。所有服务器都等待至超时间隔,该间隔是用于检查心跳的请求间隔的数倍。一般来说, 超时间隔>请求间隔>服务器之间的网络往返时间。

例如,如果服务器之间的网络往返时间是20ms,心跳可以每100ms发送一次,服务器在1秒后进行检查,给予发送多个心跳足够的时间,而不会获得假消息。如果在此间隔内没有接收到心跳,则将发送服务器视为故障。

在决定心跳间隔和超时值时,了解数据中心内部和数据中心之间的网络往返时间非常有用。[numbers-every-programmer-should-know] 是一个很好的参考。http://highscalability.com/numbers-everyone-should-know

发送心跳的服务器和接收心跳的服务器都有如下定义的调度程序。给调度程序一个方法,以固定的时间间隔执行。当启动时,该任务将被调度以执行给定的方法

代码语言:javascript复制
class HeartBeatScheduler…  
public class HeartBeatScheduler implements Logging {
      private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
      private Runnable action;
      private Long heartBeatInterval;
      public HeartBeatScheduler(Runnable action, Long heartBeatIntervalMs) {
          this.action = action;
          this.heartBeatInterval = heartBeatIntervalMs;
      }
      private ScheduledFuture<?> scheduledTask;
      public void start() {
          scheduledTask = executor.scheduleWithFixedDelay(new HeartBeatTask(action), heartBeatInterval, heartBeatInterval, TimeUnit.MILLISECONDS);
      }

在发送服务器端,调度器执行一个方法来发送心跳消息。

代码语言:javascript复制

class SendingServer…  
private void sendHeartbeat() throws IOException {
      socketChannel.blockingSend(newHeartbeatRequest(serverId));
  }

在接收服务器上,故障检测机制启动了一个类似的调度程序。它定期检查心跳是否被接收到。

代码语言:javascript复制

class AbstractFailureDetector…  
private HeartBeatScheduler heartbeatScheduler = new HeartBeatScheduler(this::heartBeatCheck, 100l);
  abstract void heartBeatCheck();
  abstract void heartBeatReceived(T serverId);

故障检测器需要有两种方法:

• 当接收服务器接收到心跳信号时调用的方法,以告诉故障检测器接收到心跳信号

代码语言:javascript复制

class ReceivingServer…  
private void handleRequest(Message<RequestOrResponse> request) {
      RequestOrResponse clientRequest = request.getRequest();
      if (isHeartbeatRequest(clientRequest)) {
          HeartbeatRequest heartbeatRequest = JsonSerDes.deserialize(clientRequest.getMessageBodyJson(), HeartbeatRequest.class);
          failureDetector.heartBeatReceived(heartbeatRequest.getServerId());
          sendResponse(request);
      } else {
          //processes other requests
      }
  }

• 一种定期检查心跳状态和检测可能故障的方法。

何时将服务器标记为失败取决于各种标准。有不同的权衡。一般来说,心跳间隔越小,故障检测到的速度就越快,但是故障检测错误的概率就越高。因此心跳间隔和对心跳丢失的解释是根据集群的要求实现的。一般有以下两大类。

小型集群——例如基于共识的系统,如RAFT, Zookeeper

在所有的consensus实现中,心跳都从leader服务器发送到所有follower服务器。每次接收到心跳时,记录心跳到达的时间戳

代码语言:javascript复制

class TimeoutBasedFailureDetector…  
@Override
  void heartBeatReceived(T serverId) {
      Long currentTime = System.nanoTime();
      heartbeatReceivedTimes.put(serverId, currentTime);
      markUp(serverId);
  }

如果在固定时间窗口内没有接收到心跳,则认为leader崩溃,并选择一个新的服务器作为leader。由于缓慢的进程或网络,有可能出现错误的故障检测。因此需要使用Generation Clock 来检测过时的leader。这提供了更好的系统可用性,因为可以在更短的时间内检测到崩溃。这适用于较小的集群,通常是3到5个节点的设置,这在大多数一致实现中是可以观察到的,比如Zookeeper或Raft。

代码语言:javascript复制

class TimeoutBasedFailureDetector…  
@Override
  void heartBeatCheck() {
      Long now = System.nanoTime();
      Set<T> serverIds = heartbeatReceivedTimes.keySet();
      for (T serverId : serverIds) {
          Long lastHeartbeatReceivedTime = heartbeatReceivedTimes.get(serverId);
          Long timeSinceLastHeartbeat = now - lastHeartbeatReceivedTime;
          if (timeSinceLastHeartbeat >= timeoutNanos) {
              markDown(serverId);
          }
      }
  }

技术考虑

当使用单个Single Socket Channel在服务器之间进行通信时,必须注意确保[队头阻塞]不会阻塞heartbeat消息的处理。否则,它可能会导致长时间的延迟,从而错误地检测到发送服务器宕机,甚至当它在定期发送心跳时也是如此。Request Pipeline 可以用来确保服务器在发送心跳之前不会等待前一个请求的响应。有时,当使用Singular Update Queue时,一些任务,如写入磁盘,可能会造成延迟,这可能会延迟处理定时中断和延迟发送心跳。

这可以通过使用单独的线程异步发送心跳来解决。像[consul]和[akka]这样的框架异步发送心跳。这也可能是接收服务器上的问题。一个正在进行磁盘写入的接收服务器,只能在写入完成后才检查心跳,从而导致错误的故障检测。因此,使用Singular Update Queue的接收服务器可以重新设置心跳检查机制,以包含这些延迟。参考[raft], [log-cabin]的实现。

有时,一些特定运行时的事件(如垃圾回收)导致的[本地暂停]会延迟心跳的处理。需要一种机制来检查处理是否在可能的本地暂停之后发生。一个简单的机制,用来检查处理是否在一个足够长的时间窗口后发生,例如5秒。在这种情况下,在时间窗口内,没有任何东西被标记为失败,而且它会被延迟到下一个周期。Cassandra中的实现就是一个很好的例子。

大型集群. Gossip基础协议

上一节中描述的heartbeat不能扩展到跨广域网的拥有几百到几千台服务器的更大集群。在大型集群中,需要考虑两件事:

• 限定的每台服务器生成的消息数量• 心跳消息所消耗的总带宽。它不应该消耗大量的网络带宽。应该有一个几百kb的上限,以确保过多的心跳消息不会影响集群中实际的数据传输。

由于这些原因,所有心跳机制都被避免。在这些情况下,通常会使用故障检测器以及跨集群传播故障信息的Gossip协议。这些集群通常采取诸如在出现故障时跨节点转移数据之类的操作,因此倾向于正确的检测并容忍更多的延迟(尽管有限制)。主要的挑战是不能因为网络延迟或缓慢的进程而将节点错误地检测为错误。这时使用的一种常见机制是为每个进程分配一个suspicion号,如果在有限的时间内没有包含该进程的gossip ,该suspicion号就会增加。它是根据过去的统计数据计算的,只有在这个suspicion数量达到配置的上限之后,它才被标记为失败。

有两个主流实现:1)Phi Accrual故障检测器(在Akka, Cassandra中使用)2)SWIM with Lifeguard enhancement(在Hashicorp editor, memberlist中使用)这些实现可以扩展到拥有数千台机器的广域网。Akka已知已在2400台服务器上试用。Hashicorp Consul 通常在一个组中部署数千个Consul 服务器。拥有一个可靠的故障检测器(它可以有效地用于大型集群部署,同时提供一些一致性保证)仍然是一个需要积极开发的领域。一些最新开发的框架,比如Rapid,看起来很有希望。

例子

• 像ZAB或RAFT这样的统一实现,它们使用3到5个节点的小型集群,实现了基于固定时间窗口的故障检测。• Akka Actors和Cassandra使用 Phi Accrual故障检测器。• Hashicorp consul 使用基于gossip的故障检测,SWIM。

java达人

ID:drjava

(长按或扫码识别)

0 人点赞