Redis进阶-Redis集群原理剖析及gossip协议初探

2021-08-17 11:27:21 浏览数 (1)

Pre

Redis进阶-分布式存储 Sequential partitioning & Hash partitioning


集群架构

再来所以说为啥需要集群?

两个方面:

  1. 并发量 : redis官方称10万的QPS,如果我的业务真的大到百万的QPS呢? 单节点的读写 显然不行。
  2. 数据量 : 如果我要缓存 500G的数据, 显然哨兵模式是扛不住的,单个节点缓存这么多的数据,启动redis变得相当慢 。 如果不能扩主节点,你只能加机器了,从成本上考虑,不合适~

集群原理

Redis Cluster 将所有数据划分为 16384 个 slots(槽位),每个节点负责其中一部分槽位。槽位的信息存储于每个节点中。

为啥要用16384个slot , 请移步 Redis进阶-分布式存储 Sequential partitioning & Hash partitioning


当 Redis Cluster 的客户端来连接集群时,它也会得到一份集群的槽位配置信息并将其缓存在客户端本地。这样当客户端要查找某个 key 时,可以直接定位到目标节点。

我们来看下jedis(Version:2.9.0)的实现

代码语言:javascript复制
 jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config);

跟下源码

代码语言:javascript复制
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                       final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
    initializeSlotsCache(nodes, poolConfig, password);
  }

重点看下 initializeSlotsCache

代码语言:javascript复制
  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      if (password != null) {
        jedis.auth(password);
      }
      try {
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }

继续 cache.discoverClusterNodesAndSlots(jedis); cache为 JedisClusterInfoCache 对象。

代码语言:javascript复制
  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();

    try {
      reset();
      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i  ) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.size() <= 0) {
            continue;
          }

          HostAndPort targetNode = generateHostAndPort(hostInfos);
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }

同时因为槽位的信息可能会存在客户端与服务器不一致的情况,还需要纠正机制来实现槽位信息的校验调整。


槽位定位算法

代码语言:javascript复制
jedisCluster.set("clusterArtisan", "artisanValue")

跟下源码

代码语言:javascript复制
  @Override
  public String set(final String key, final String value) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      }
    }.run(key);
  }

重点是这个connetion 对象是从哪里来的,继续看下 run

代码语言:javascript复制
  public T run(String key) {
  ..........
    return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
  }

继续

代码语言:javascript复制
 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
  
    Jedis connection = null;
    try {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
 

      return execute(connection);
    ......
   ......
  }

重点来了

代码语言:javascript复制
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

是不是就是CRC16 ?

代码语言:javascript复制
public static int getSlot(byte[] key) {
    int s = -1;
    int e = -1;
    boolean sFound = false;
    for (int i = 0; i < key.length; i  ) {
      if (key[i] == '{' && !sFound) {
        s = i;
        sFound = true;
      }
      if (key[i] == '}' && sFound) {
        e = i;
        break;
      }
    }
    if (s > -1 && e > -1 && e != s   1) {
      return getCRC16(key, s   1, e) & (16384 - 1);
    }
    return getCRC16(key) & (16384 - 1);
  }

Cluster 默认会对 key 值使用 crc16 算法进行 hash 得到一个整数值,然后用这个整数值对 16384 进行取模来得到具体 槽位。 HASH_SLOT = CRC16(key) mod 16384


跳转重定位

当客户端向一个错误的节点发出了指令,该节点会发现指令的 key 所在的槽位并不归自己管理,这时它会向客户端发送一个特殊的跳转指令携带目标操作的节点地址,告诉客户端去连这个节点去获取数据。

客户端收到指令后除了跳转到正确的节点上去操作,还会同步更新纠正本地的槽位映射表缓存,后续所有 key 将使用新的槽位映射表


Redis集群节点间的通信机制gossip协议

redis cluster节点间采取gossip协议进行通信


维护集群的元数据的两种方式

集中式和gossip


集中式
  • 优点在于元数据的更新和读取,时效性非常好,一旦元数据出现变更立即就会更新到集中式的存储中,其他节点读取的时候立即就可以立即感知到;
  • 不足之处: 所有的元数据的更新压力全部集中在一个地方,可能导致元数据的存储压力

Gossip protocol

Gossip protocol 也叫 Epidemic Protocol (流行病协议),别名很多比如:“流言算法”、“疫情传播算法”等。

Gossip protocol 最早是在 1987 年发表在 ACM 上的论文 《Epidemic Algorithms for Replicated Database Maintenance》中被提出。

主要用在分布式数据库系统中各个副本节点同步数据之用,这种场景的一个最大特点就是组成的网络的节点都是对等节点,是非结构化网络


Gossip 演示

我们先做一些前提设定

1、Gossip 是周期性的散播消息,把周期限定为 1 秒

2、被感染节点随机选择 k 个邻接节点(fan-out)散播消息,这里把 fan-out 设置为 3,每次最多往 3 个节点散播。

3、每次散播消息都选择尚未发送过的节点进行散播

4、收到消息的节点不再往发送节点散播,比如 A -> B,那么 B 进行散播的时候,不再发给 A。

这里一共有 16 个节点,节点 1 为初始被感染节点,通过 Gossip 过程,最终所有节点都被感染:


gossip协议消息

gossip协议包含多种消息,包括ping,pong,meet,fail等等。

  • ping:每个节点都会频繁给其他节点发送ping,其中包含自己的状态还有自己维护的集群元数据,互相通过ping交换元数据;
  • pong: 返回ping和meet,包含自己的状态和其他信息,也可以用于信息广播和更新;
  • fail: 某个节点判断另一个节点fail之后,就发送fail给其他节点,通知其他节点,指定的节点宕机了。
  • meet:某个节点发送meet给新加入的节点,让新节点加入集群中,然后新节点就会开始与其他节点进行通信,不需要发送形成网络的所需的所有CLUSTER MEET命令。发送CLUSTER MEET消息以便每个节点能够达到其他每个节点只需通过一条已知的节点链就够了。由于在心跳包中会交换gossip信息,将会创建节点间缺失的链接。

优缺点
  • 优点: gossip协议的优点在于元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续,打到所有节点上去更新有一定的延时,降低了压力; 去中心化、可扩展、容错、一致性收敛、简单。 由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议
  • 缺点: 元数据更新有延时可能导致集群的一些操作会有一些滞后。 消息的延迟 , 消息冗余 。

10000端口

每个节点都有一个专门用于节点间通信的端口,就是自己提供服务的端口号 10000,比如7001,那么用于节点间通信的就是17001端口。

每个节点每隔一段时间都会往另外几个节点发送ping消息,同时其他几点接收到ping消息之后返回pong消息。


网络抖动

真实世界的机房网络往往并不是风平浪静的,它们经常会发生各种各样的小问题。比如网络抖动就是非常常见的一种现象,突然之间部分连接变得不可访问,然后很快又恢复正常。

为解决这种问题,Redis Cluster 提供了一种选项cluster-­node­-timeout,表示当某个节点持续 timeout 的时间失联时,才可以认定该节点出现故障,需要进行主从切换。如果没有这个选项,网络抖动会导致主从频繁切换 (数据的重新复制)。

redis.conf配置文件中


Redis集群选举原理分析

当slave发现自己的master变为FAIL状态时,便尝试进行Failover,以期成为新的master。由于挂掉的master可能会有多个slave,从而存在多个slave竞争成为master节点的过程, 其过程如下:

  1. slave发现自己的master变为FAIL
  2. 将自己记录的集群currentEpoch加1,并广播FAILOVER_AUTH_REQUEST 信息
  3. 其他节点收到该信息,只有master响应,判断请求者的合法性,并发送FAILOVER_AUTH_ACK,对每一个epoch只发送一次ack
  4. 尝试failover的slave收集master返回的FAILOVER_AUTH_ACK
  5. slave收到超过半数master的ack后变成新Master (这里解释了集群为什么至少需要三个主节点,如果只有两个,当其中一个挂了,只剩一个主节点是不能选举成功的)
  6. 广播Pong消息通知其他集群节点。

从节点并不是在主节点一进入 FAIL 状态就马上尝试发起选举,而是有一定延迟,一定的延迟确保我们等待FAIL状态在集群中传播,slave如果立即尝试选举,其它masters或许尚未意识到FAIL状态,可能会拒绝投票。

延迟计算公式: DELAY = 500ms random(0 ~ 500ms) SLAVE_RANK * 1000ms

SLAVE_RANK表示此slave已经从master复制数据的总量的rank。Rank越小代表已复制的数据越新。这种方式下,持有最新数据的slave将会首先发起选举(理论上)。


集群是否完整才能对外提供服务

当redis.conf的配置cluster-require-full-coverage为no时,表示当负责一个插槽的主库下线且没有相应的从库进行故障恢复时,集群仍然可用,如果为yes则集群不可用。


Redis集群为什么至少需要三个master节点,并且推荐节点数为奇数?

因为新master的选举需要大于半数的集群master节点同意才能选举成功,如果只有两个master节点,当其中一个挂了,是达不到选举新master的条件的。

奇数个master节点可以在满足选举该条件的基础上节省一个节点,比如三个master节点和四个master节点的集群相比,大家如果都挂了一个master节点都能选举新master节点,如果都挂了两个master节点都没法选举新master节点了,所以奇数的master节点更多的是从节省机器资源角度出发说的。

0 人点赞