Gossip协议及Consul中的实现

2022-10-27 09:57:10 浏览数 (2)

gossip 协议(gossip protocol)又称 epidemic 协议(epidemic protocol),是基于流行病传播方式的节点或者进程之间信息交换的协议,在分布式系统中被广泛使用,比如我们可以使用 gossip 协议来确保网络中所有节点的数据一样。gossip 协议利用一种随机的方式将信息传播到整个网络中,并在一定时间内使得系统内的所有节点数据一致。Gossip 其实是一种去中心化思路的分布式协议,解决状态在集群中的传播和状态一致性的保证两个问题。

特点

1、可扩展性(Scalable) gossip 协议是可扩展的,一般需要 O(logN) 轮就可以将信息传播到所有的节点,其中 N 代表节点的个数。每个节点仅发送固定数量的消息。在数据传送的时候,节点并不会等待消息的 ack,所以消息传送失败也没有关系,因为可以通过其他节点将消息传递给之前传送失败的节点。系统可以轻松扩展到数百万个进程。

2、容错(Fault-tolerance) 网络中任何节点的重启或者宕机都不会影响 gossip 协议的运行。

3、健壮性(Robust) gossip 协议是去中心化的协议,所以集群中的所有节点都是对等的,没有特殊的节点,所以任何节点出现问题都不会阻止其他节点继续发送消息。任何节点都可以随时加入或离开,而不会影响系统的整体服务质量(QOS)

4、最终一致性(Convergent consistency) Gossip 协议实现信息指数级的快速传播,因此在有新信息需要传播时,消息可以快速地发送到全局节点,在有限的时间内能够做到所有节点都拥有最新的数据

缺点

1、消息的延迟 由于 Gossip 协议中,节点只会随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网的,因此使用 Gossip 协议会造成不可避免的消息延迟。不适合用在对实时性要求较高的场景下。

2、消息冗余 Gossip协议规定,节点会定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此就不可避免的存在消息重复发送给同一节点的情况,造成了消息的冗余,同时也增加了收到消息的节点的处理压力。而且,由于是定期发送,因此,即使收到了消息的节点还会反复收到重复消息,加重了消息的冗余。

执行过程

Gossip过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

通信模式

在 Gossip 协议下,网络中两个节点之间有三种通信方式:

  • Push: 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点,B 节点更新 A 中比自己新的数据
  • Pull:A 仅将数据 key, version 推送给 B,B 将本地比 A 新的数据(Key, value, version)推送给 A,A 更新本地
  • Push/Pull:与 Pull 类似,只是多了一步,A 再将本地比 B 新的数据推送给 B,B 则更新本地 如果把两个节点数据同步一次定义为一个周期,则在一个周期内,Push 需通信 1 次,Pull 需 2 次,Push/Pull 则需 3 次。虽然消息数增加了,但从效果上来讲,Push/Pull 最好,理论上一个周期内可以使两个节点完全一致。直观上,Push/Pull 的收敛速度也是最快的。

Consul中的Gossip

Consul用了两种不同的Gossip池。把这两种池分别叫做LAN池和WAN池。

LAN池

Consul中的每个数据中心有一个LAN池,它包含了这个数据中心的所有成员,包括clients和servers。LAN池用于以下几个目的:

  • 成员关系信息允许client自动发现server, 减少了所需要的配置量。
  • 分布式失败检测机制使得由整个集群来做失败检测这件事, 而不是集中到几台机器上。
  • gossip池使得类似领导人选举这样的事件变得可靠而且迅速。

WAN池

WAN池是全局唯一的,因为所有的server都应该加入到WAN池中,无论它位于哪个数据中心。由WAN池提供的成员关系信息允许server做一些跨数据中心的请求。一体化的失败检测机制允许Consul优雅地去处理:整个数据中心失去连接, 或者仅仅是别的数据中心的某一台失去了连接。

consul在gossip上的实现实际上是使用的memberlist库,也是自家公司提供的。其实现了集群内节点发现、 节点失效探测、节点故障转移、节点状态同步等。下面我们主要来看下周期性任务这一块

节点状态

节点的state有3种

  • 1、alive:节点是”活的”
  • 2、suspect:对于PingMsg没有应答或应答超时,这个节点的状态是”可疑的”
  • 3、dead:节点”已死亡”

周期性任务

代码语言:javascript复制
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
  m.tickerLock.Lock()
  defer m.tickerLock.Unlock()

  // If we already have tickers, then don't do anything, since we're
  // scheduled
  if len(m.tickers) > 0 {
    return
  }

  // Create the stop tick channel, a blocking channel. We close this
  // when we should stop the tickers.
  stopCh := make(chan struct{})

  // Create a new probeTicker
  if m.config.ProbeInterval > 0 {
    t := time.NewTicker(m.config.ProbeInterval)
    go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
    m.tickers = append(m.tickers, t)
  }

  // Create a push pull ticker if needed
  if m.config.PushPullInterval > 0 {
    go m.pushPullTrigger(stopCh)
  }

  // Create a gossip ticker if needed
  if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
    t := time.NewTicker(m.config.GossipInterval)
    go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
    m.tickers = append(m.tickers, t)
  }

  // If we made any tickers, then record the stopTick channel for
  // later.
  if len(m.tickers) > 0 {
    m.stopTick = stopCh
  }
}

从上面的代码,可以看到,主要有以下几个周期性任务:

  • 1、故障检测
  • 2、状态合并(push/pull消息)
  • 3、广播gossip消息
1、故障检测
代码语言:javascript复制
func (m *Memberlist) probe() {
  // Track the number of indexes we've considered probing
  numCheck := 0
START:
  m.nodeLock.RLock()

  // Make sure we don't wrap around infinitely
  if numCheck >= len(m.nodes) {
    m.nodeLock.RUnlock()
    return
  }

  // Handle the wrap around case
  if m.probeIndex >= len(m.nodes) {
    m.nodeLock.RUnlock()
    m.resetNodes()
    m.probeIndex = 0
    numCheck  
    goto START
  }

  // Determine if we should probe this node
  skip := false
  var node nodeState

  node = *m.nodes[m.probeIndex]
  if node.Name == m.config.Name {
    skip = true
  } else if node.DeadOrLeft() {
    skip = true
  }

  // Potentially skip
  m.nodeLock.RUnlock()
  m.probeIndex  
  if skip {
    numCheck  
    goto START
  }

  // Probe the specific node
  m.probeNode(&node)
}

上面的代码主要做了几件事:

  • 1、所有的节点是否都已经探测过,如果是,则直接返回
  • 2、重置节点:在节点列表中移除死亡节点,并更新活跃节点,并随机打乱节点列表(目的不详)
  • 3、检查节点是否是否探测:在配置文件中或者状态是dead和left的节点会跳过
  • 4、开始探测节点
1.1 直接探测节点
代码语言:javascript复制
func (m *Memberlist) probeNode(node *nodeState) {
  defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())

  // We use our health awareness to scale the overall probe interval, so we
  // slow down if we detect problems. The ticker that calls us can handle
  // us running over the base interval, and will skip missed ticks.
  probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
  if probeInterval > m.config.ProbeInterval {
    metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
  }

  // Prepare a ping message and setup an ack handler.
  selfAddr, selfPort := m.getAdvertise()
  ping := ping{
    SeqNo:      m.nextSeqNo(),
    Node:       node.Name,
    SourceAddr: selfAddr,
    SourcePort: selfPort,
    SourceNode: m.config.Name,
  }
  ackCh := make(chan ackMessage, m.config.IndirectChecks 1)
  nackCh := make(chan struct{}, m.config.IndirectChecks 1)
  m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)

  // Mark the sent time here, which should be after any pre-processing but
  // before system calls to do the actual send. This probably over-reports
  // a bit, but it's the best we can do. We had originally put this right
  // after the I/O, but that would sometimes give negative RTT measurements
  // which was not desirable.
  sent := time.Now()

  // Send a ping to the node. If this node looks like it's suspect or dead,
  // also tack on a suspect message so that it has a chance to refute as
  // soon as possible.
  deadline := sent.Add(probeInterval)
  addr := node.Address()

  // Arrange for our self-awareness to get updated.
  var awarenessDelta int
  defer func() {
    m.awareness.ApplyDelta(awarenessDelta)
  }()
  if node.State == StateAlive {
    if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
      m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
      if failedRemote(err) {
        goto HANDLE_REMOTE_FAILURE
      } else {
        return
      }
    }
  } else {
    var msgs [][]byte
    if buf, err := encode(pingMsg, &ping); err != nil {
      m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
      return
    } else {
      msgs = append(msgs, buf.Bytes())
    }
    s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
    if buf, err := encode(suspectMsg, &s); err != nil {
      m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
      return
    } else {
      msgs = append(msgs, buf.Bytes())
    }

    compound := makeCompoundMessage(msgs)
    if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
      m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
      if failedRemote(err) {
        goto HANDLE_REMOTE_FAILURE
      } else {
        return
      }
    }
  }

  // Arrange for our self-awareness to get updated. At this point we've
  // sent the ping, so any return statement means the probe succeeded
  // which will improve our health until we get to the failure scenarios
  // at the end of this function, which will alter this delta variable
  // accordingly.
  awarenessDelta = -1

  // Wait for response or round-trip-time.
  select {
  case v := <-ackCh:
    if v.Complete == true {
      if m.config.Ping != nil {
        rtt := v.Timestamp.Sub(sent)
        m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
      }
      return
    }

    // As an edge case, if we get a timeout, we need to re-enqueue it
    // here to break out of the select below.
    if v.Complete == false {
      ackCh <- v
    }
  case <-time.After(m.config.ProbeTimeout):
    // Note that we don't scale this timeout based on awareness and
    // the health score. That's because we don't really expect waiting
    // longer to help get UDP through. Since health does extend the
    // probe interval it will give the TCP fallback more time, which
    // is more active in dealing with lost packets, and it gives more
    // time to wait for indirect acks/nacks.
    m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
  }
...
}

该部分代码逻辑如下:

  • 1、如果该节点alive状态的,则直接发送ping消息。如果该节点是其他状态,则会先发送ping消息,然后再发送suspect(可疑)消息,以便该节点有机会尽快进行响应(表明自己不是不正常状态)。
  • 2、发送之后,开始计时,如果在超时时间之内返回,则表示正常,直接返回;否则,进入间接探测流程
1.2 间接探测流程

该流程同样在上面那个函数中,代码如下:

代码语言:javascript复制
func (m *Memberlist) probeNode(node *nodeState) {
...
HANDLE_REMOTE_FAILURE:
  // Get some random live nodes.
  m.nodeLock.RLock()
  kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
    return n.Name == m.config.Name ||
      n.Name == node.Name ||
      n.State != StateAlive
  })
  m.nodeLock.RUnlock()

  // Attempt an indirect ping.
  expectedNacks := 0
  selfAddr, selfPort = m.getAdvertise()
  ind := indirectPingReq{
    SeqNo:      ping.SeqNo,
    Target:     node.Addr,
    Port:       node.Port,
    Node:       node.Name,
    SourceAddr: selfAddr,
    SourcePort: selfPort,
    SourceNode: m.config.Name,
  }
  for _, peer := range kNodes {
    // We only expect nack to be sent from peers who understand
    // version 4 of the protocol.
    if ind.Nack = peer.PMax >= 4; ind.Nack {
      expectedNacks  
    }

    if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
      m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
    }
  }

  // Also make an attempt to contact the node directly over TCP. This
  // helps prevent confused clients who get isolated from UDP traffic
  // but can still speak TCP (which also means they can possibly report
  // misinformation to other nodes via anti-entropy), avoiding flapping in
  // the cluster.
  //
  // This is a little unusual because we will attempt a TCP ping to any
  // member who understands version 3 of the protocol, regardless of
  // which protocol version we are speaking. That's why we've included a
  // config option to turn this off if desired.
  fallbackCh := make(chan bool, 1)

  disableTcpPings := m.config.DisableTcpPings ||
    (m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
  if (!disableTcpPings) && (node.PMax >= 3) {
    go func() {
      defer close(fallbackCh)
      didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
      if err != nil {
        m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
      } else {
        fallbackCh <- didContact
      }
    }()
  } else {
    close(fallbackCh)
  }

  // Wait for the acks or timeout. Note that we don't check the fallback
  // channel here because we want to issue a warning below if that's the
  // *only* way we hear back from the peer, so we have to let this time
  // out first to allow the normal UDP-based acks to come in.
  select {
  case v := <-ackCh:
    if v.Complete == true {
      return
    }
  }

  // Finally, poll the fallback channel. The timeouts are set such that
  // the channel will have something or be closed without having to wait
  // any additional time here.
  for didContact := range fallbackCh {
    if didContact {
      m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
      return
    }
  }

  // Update our self-awareness based on the results of this failed probe.
  // If we don't have peers who will send nacks then we penalize for any
  // failed probe as a simple health metric. If we do have peers to nack
  // verify, then we can use that as a more sophisticated measure of self-
  // health because we assume them to be working, and they can help us
  // decide if the probed node was really dead or if it was something wrong
  // with ourselves.
  awarenessDelta = 0
  if expectedNacks > 0 {
    if nackCount := len(nackCh); nackCount < expectedNacks {
      awarenessDelta  = (expectedNacks - nackCount)
    }
  } else {
    awarenessDelta  = 1
  }

  // No acks received from target, suspect it as failed.
  m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
  s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
  m.suspectNode(&s)
}

在直接探测一个节点失败之后,会进入该流程,逻辑如下:

  • 1、从其它alive节点中随机选择几个(可配置),向其发送ping消息:序列号,目标节点ip和端口(也就是刚刚直接探测失败的节点),源ip和端口
  • 2、检查目标节点是否有应答,如果没有期望数量的应答,则将该节点置为suspect状态
2、状态合并(push/pull消息)

周期性的从已知的alive的集群节点中选1个节点进行push/pull交换信息 交换的信息包含2种

  • a) 集群信息
  • b) 用户自定义的状态信息,需要1个实现Delegate接口的struct

具体实现如下:

代码语言:javascript复制
func (m *Memberlist) pushPull() {
  // Get a random live node
  m.nodeLock.RLock()
  nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
    return n.Name == m.config.Name ||
      n.State != StateAlive
  })
  m.nodeLock.RUnlock()

  // If no nodes, bail
  if len(nodes) == 0 {
    return
  }
  node := nodes[0]

  // Attempt a push pull
  if err := m.pushPullNode(node.FullAddress(), false); err != nil {
    m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
  }
}

// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(a Address, join bool) error {
  defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())

  // Attempt to send and receive with the node
  remote, userState, err := m.sendAndReceiveState(a, join)
  if err != nil {
    return err
  }

  if err := m.mergeRemoteState(join, remote, userState); err != nil {
    return err
  }
  return nil
}

从alive状态的节点中随机选择一个,用当前节点与该节点建立连接,然后做如下操作:

2.1 发送当前节点的状态信息

部分代码如下:

代码语言:javascript复制
localNodes := make([]pushNodeState, len(m.nodes))
for idx, n := range m.nodes {
  localNodes[idx].Name = n.Name
  localNodes[idx].Addr = n.Addr
  localNodes[idx].Port = n.Port
  localNodes[idx].Incarnation = n.Incarnation
  localNodes[idx].State = n.State
  localNodes[idx].Meta = n.Meta
  localNodes[idx].Vsn = []uint8{
    n.PMin, n.PMax, n.PCur,
    n.DMin, n.DMax, n.DCur,
  }
}
m.nodeLock.RUnlock()

// Get the delegate state
var userData []byte
if m.config.Delegate != nil {
  userData = m.config.Delegate.LocalState(join)
}

// Create a bytes buffer writer
bufConn := bytes.NewBuffer(nil)

// Send our node state
header := pushPullHeader{Nodes: len(localNodes), UserStateLen: len(userData), Join: join}
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(bufConn, &hd)

可以看出,是将当前节点内存中保存的所有节点的相关信息发送给远端节点,具体信息封装在localNodes对象中。

2.2 获取远端节点发送过来的状态信息

代码如下:

代码语言:javascript复制
func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) {
  // Read the push/pull header
  var header pushPullHeader
  if err := dec.Decode(&header); err != nil {
    return false, nil, nil, err
  }

  // Allocate space for the transfer
  remoteNodes := make([]pushNodeState, header.Nodes)

  // Try to decode all the states
  for i := 0; i < header.Nodes; i   {
    if err := dec.Decode(&remoteNodes[i]); err != nil {
      return false, nil, nil, err
    }
  }

  // Read the remote user state into a buffer
  var userBuf []byte
  if header.UserStateLen > 0 {
    userBuf = make([]byte, header.UserStateLen)
    bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserStateLen)
    if err == nil && bytes != header.UserStateLen {
      err = fmt.Errorf(
        "Failed to read full user state (%d / %d)",
        bytes, header.UserStateLen)
    }
    if err != nil {
      return false, nil, nil, err
    }
  }

  // For proto versions < 2, there is no port provided. Mask old
  // behavior by using the configured port
  for idx := range remoteNodes {
    if m.ProtocolVersion() < 2 || remoteNodes[idx].Port == 0 {
      remoteNodes[idx].Port = uint16(m.config.BindPort)
    }
  }

  return header.Join, remoteNodes, userBuf, nil
}

从代码可以看到,返回的消息跟当前节点发送过去的信息差不多

2.3 将本地节点信息跟远端节点信息合并
代码语言:javascript复制
func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
  if err := m.verifyProtocol(remoteNodes); err != nil {
    return err
  }

  // Invoke the merge delegate if any
  if join && m.config.Merge != nil {
    nodes := make([]*Node, len(remoteNodes))
    for idx, n := range remoteNodes {
      nodes[idx] = &Node{
        Name:  n.Name,
        Addr:  n.Addr,
        Port:  n.Port,
        Meta:  n.Meta,
        State: n.State,
        PMin:  n.Vsn[0],
        PMax:  n.Vsn[1],
        PCur:  n.Vsn[2],
        DMin:  n.Vsn[3],
        DMax:  n.Vsn[4],
        DCur:  n.Vsn[5],
      }
    }
    if err := m.config.Merge.NotifyMerge(nodes); err != nil {
      return err
    }
  }

  // Merge the membership state
  m.mergeState(remoteNodes)

  // Invoke the delegate for user state
  if userBuf != nil && m.config.Delegate != nil {
    m.config.Delegate.MergeRemoteState(userBuf, join)
  }
  return nil
}
3、广播gossip消息
代码语言:javascript复制
func (m *Memberlist) gossip() {
  defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

  // Get some random live, suspect, or recently dead nodes
  m.nodeLock.RLock()
  kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
    if n.Name == m.config.Name {
      return true
    }

    switch n.State {
    case StateAlive, StateSuspect:
      return false

    case StateDead:
      return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

    default:
      return true
    }
  })
  m.nodeLock.RUnlock()

  // Compute the bytes available
  bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
  if m.config.EncryptionEnabled() {
    bytesAvail -= encryptOverhead(m.encryptionVersion())
  }

  for _, node := range kNodes {
    // Get any pending broadcasts
    msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
    if len(msgs) == 0 {
      return
    }

    addr := node.Address()
    if len(msgs) == 1 {
      // Send single message as is
      if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil {
        m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
      }
    } else {
      // Otherwise create and send a compound message
      compound := makeCompoundMessage(msgs)
      if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
        m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
      }
    }
  }
}

上面的代码主要做了几件事:

  • 1、从满足条件(死亡状态)的节点中选择几个节点(可配置)
  • 2、构造广播消息
  • 3、依次向那些节点发送广播消息
最后说明
  • 1、广播其实也是一种Gossip,发布者并不把消息发给集群中的每一个节点,而是随机挑选n个(默认是3个),将消息发送出去
  • 2、处于dead状态的节点,仍然会被保留在集群信息中一段时间,以便Push/Pull的时候,这个状态能够被扩散出去

0 人点赞