维护较小的集群可提供更强的一致性,以允许大型数据集群协调服务器活动,而无需实现基于 quorum 的算法。
问题
线性化是最强的一致性保证,可以保证所有客户端都能看到最新提交的数据更新。提供线性化以及容错功能需要在服务器上实现共识算法,例如 Raft,Zab 或Paxos。
尽管共识算法是实现 Consistent Core 的基本要求,但客户端交互的各个方面(例如客户端如何找到leader,重复请求的处理方式等)都是重要的实现决策。关于安全性和活跃性,还有一些重要的实现注意事项。Paxos 仅定义共识算法,但是 Paxos 文献中没有很好地记录其他实现方面的内容。Raft非常清楚地记录了各种实现方面以及参考实现,因此是当今使用最广泛的算法。
当集群需要处理大量数据时,它需要越来越多的服务器。对于服务器集群,存在一些共同的要求,例如选择特定的服务器作为特定任务的 master ,管理组成员信息,将数据分区映射到服务器等。这些要求需要强大的一致性保证,即线性化 。实现也必须是容错的。一种常见的方法是使用基于 Quorum 的容错共识算法。但是在基于 Quorum 的系统中,吞吐量会随着集群的大小而降低。
解决方案
实现一个较小的3到5个节点的集群,该集群可提供线性化保证和容错能力。单独的数据集群可以使用小型一致性集群来管理元数据,并使用诸如 Lease 之类的机制来进行集群范围的决策。这样,数据集群可以扩展大量的服务器,但是仍然可以使用较小的元数据集群执行某些需要强一致性保证的操作。
Figure 1: Consistent Core 一个典型的consistent core接口是这样的:
代码语言:javascript复制public interface ConsistentCore {
CompletableFuture put(String key, String value);
List<String> get(String keyPrefix);
CompletableFuture registerLease(String name, long ttl);
void refreshLease(String name);
void watch(String name, Consumer<WatchEvent> watchCallback);
}
Consistent Core至少提供了一种简单的键值存储机制。它用于存储元数据。
元数据存储
使用诸如Raft之类的共识算法来实现存储。它是“Replicated Write Ahead Log”实现的示例,其中复制由Leader and Followers处理,High-Water Mark用于跟踪通过Quorum进行的成功复制。
支持分层存储
Consistent Core通常用于存储数据,例如:组成员身份或跨服务器的任务分配。一种常见的使用模式是使用前缀来限制元数据的类型。例如 对于组成员身份,keys 将全部存储为/servers/ 1,server/2等。对于分配给服务器的任务,keys可以为/tasks/task1,/tasks/task2。通常使用特定前缀读取所有键数据。例如,要获取有关集群中所有服务器的信息,将读取所有带有前缀/servers 的 keys。
用法示例如下: 服务器可以通过使用前缀/servers 创建自己的 key 来向 Consistent Core 注册自己。
代码语言:javascript复制client1.setValue("/servers/1", "{address:192.168.199.10, port:8000}");
client2.setValue("/servers/2", "{address:192.168.199.11, port:8000}");
client3.setValue("/servers/3", "{address:192.168.199.12, port:8000}");
然后,客户端可以通过读取key前缀 /servers 来了解集群中的所有服务器,如下所示:
代码语言:javascript复制assertEquals(client1.getValue("/servers"), Arrays.asList("{address:192.168.199.12, port:8000}",
"{address:192.168.199.11, port:8000}",
"{address:192.168.199.10, port:8000}"));
由于数据存储的这种分层性质,[zookeeper],[chubby]之类的产品提供了类似于接口的文件系统,用户可以在其中创建具有父节点和子节点概念的目录、文件或节点。[etcd3]具有扁平化的键空间,可以获取一系列键。
处理客户端交互
Consistent Core功能的关键要求之一是客户端如何与 Consistent Core 交互。以下方面对于客户端使用 Consistent Core至关重要。
寻找 Leader
串行化和线性化 当 follower 服务器处理读取请求时,由于 leader 的最新提交尚未到达跟随者,客户端可能会获得陈旧数据。客户端接收更新的顺序仍保持不变,但是更新可能不是最新的。与线性化相对,这是串行化保证。线性化可确保每个客户端都获得最新更新。客户端仅需要读取元数据并且可以暂时容纳过时的元数据时,便可以使用串行化保证。对于诸如 Lease 之类的操作,需要严格线性化。
如果将 leader 从集群中分割出去,则客户端可以从 leader 获得陈旧的值,Raft描述了一种提供线性化读取的机制。参见例子etcd 的readIndex实现。
分割的 follower 可能会发生类似的情况。follower 可能已分割,可能未将最新值返回给客户端。为确保 follower 未分割且与 leader 保持最新,他们需要查询 leader 以了解最新更新,并等到收到最新更新后再对客户端进行响应,请参阅proposed kafka design例子。
所有操作都必须在 leader 上执行,这很重要,因此客户端库需要首先找到leader 服务器。有两种方法可以满足此要求。
- consistent core中的follower服务器知道当前的 leader,因此,如果客户端连接到 follower,则它可以返回leader 的地址。然后,客户端可以直接连接在响应中标识的 leader。应当注意,当客户端尝试连接时,服务器可能处于leader选举状态。在这种情况下,服务器无法返回leader地址,客户端需要等待并尝试另一台服务器。
- 服务器可以实现转发机制,并将所有客户端请求转发给 leader。这允许客户端连接到任何服务器。同样,如果服务器处于leader 选举中,则客户端需要重试,直到leader选举成功并建立了合法的 leader 为止。 Zookeeper 和 etcd 之类的产品之所以采用这种方法,是因为它们允许follower 服务器处理一些只读请求。当大量客户端是只读客户端时,这避免了leader 的瓶颈。这样可以让客户端根据请求类型减少连接到leader 或follower 的复杂性。
找到leader的一种简单机制是尝试连接到每个服务器并尝试发送请求,如果服务器不是leader,则服务器会重定向响应。
代码语言:javascript复制private void establishConnectionToLeader(List<InetAddressAndPort> servers) {
for (InetAddressAndPort server : servers) {
try {
SingleSocketChannel socketChannel = new SingleSocketChannel(server, 10);
logger.info("Trying to connect to " server);
RequestOrResponse response = sendConnectRequest(socketChannel);
if (isRedirectResponse(response)) {
redirectToLeader(response);
break;
} else if (isLookingForLeader(response)) {
logger.info("Server is looking for leader. Trying next server");
continue;
} else { //we know the leader
logger.info("Found leader. Establishing a new connection.");
newPipelinedConnection(server);
break;
}
} catch (IOException e) {
logger.info("Unable to connect to " server);
//try next server
}
}
}
private boolean isLookingForLeader(RequestOrResponse requestOrResponse) {
return requestOrResponse.getRequestId() == RequestId.LookingForLeader.getId();
}
private void redirectToLeader(RequestOrResponse response) {
RedirectToLeaderResponse redirectResponse = deserialize(response);
newPipelinedConnection(redirectResponse.leaderAddress);
logger.info("Connected to the new leader "
redirectResponse.leaderServerId
" " redirectResponse.leaderAddress
". Checking connection");
}
private boolean isRedirectResponse(RequestOrResponse requestOrResponse) {
return requestOrResponse.getRequestId() == RequestId.RedirectToLeader.getId();
}
仅建立TCP连接是不够的,我们需要知道服务器是否可以处理我们的请求。因此,客户端向服务器发送一个特殊的连接请求,以确认服务器是否可以处理请求或重定向到 leader 服务器。
代码语言:javascript复制private RequestOrResponse sendConnectRequest(SingleSocketChannel socketChannel) throws IOException {
RequestOrResponse request
= new RequestOrResponse(RequestId.ConnectRequest.getId(), "CONNECT", 0);
try {
return socketChannel.blockingSend(request);
} catch (IOException e) {
resetConnectionToLeader();
throw e;
}
}
如果现有的leader失败,同样的技术被用来从集群中识别新当选的 leader。 连接后,客户端将维护到leader 服务器的Single Socket Channel。
处理重复请求
在失败的情况下,客户端可以尝试连接到新的leader,重新发送请求。但是,如果那些请求在失败之前已经由失败的leader 处理过,则可能会导致重复。因此,在服务器上具有一种机制可以忽略重复的请求,这一点很重要。Idempotent Receiver 模式用于实现重复检测。
使用 Lease 可以完成一组服务器之间的协调任务。可以使用相同的方法来实现组成员身份和故障检测机制。
State Watch 用于获取有关元数据或时间限制租约更改的通知。
例子
众所周知,谷歌使用[chubby]锁服务进行协调和元数据管理。
[kafka]使用[zookeeper]来管理元数据,并为集群主服务器做出决策,例如选择 leader。kafka 提议的体系结构更改将用其自己的基于[raft]的控制器集群代替Zookeeper。
[bookkeeper]使用 Zookeeper 管理集群元数据。
[kubernetes]使用[etcd]进行协调,管理集群元数据和组成员信息。
[hdfs],[spark],[flink]等所有大数据存储和处理系统都使用[zookeeper]来实现高可用性和集群协调。
说明
注1:因为整个集群都依赖于 Consistent Core,所以了解所使用的一致性算法的细节至关重要。在某些棘手的网络分区情况下,一致性实现可能会遇到活跃性问题。例如,除非特别注意,否则Raft集群可能会被分割的服务器破坏,分割的服务器会不断触发leader选举。最近在 Cloudflare 发生的事件是一个值得学习的好例子。