Dyno-queues 分布式延迟队列 之 基本功能
0x00 摘要
本系列我们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,通过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。
0x01 Dyno-queues分布式延迟队列
Dyno-queues 是 Netflix 实现的基于 Dynomite 和 Redis 构建的队列。
Dynomite是一种通用的实现,可以与许多不同的 key-value 存储引擎一起使用。目前它提供了对Redis序列化协议(RESP)和Memcached写协议的支持。
1.1 设计目标
具体设计目标依据业务系统不同而不同。
Dyno-queues 的业务背景是:在 Netflix 的平台上运行着许多的业务流程,这些流程的任务是通过异步编排进行驱动,现在要实现一个分布式延迟队列,这个延迟队列具有如下特点:
- 分布式;
- 不用外部的锁机制;
- 高并发;
- 至少一次语义交付;
- 不遵循严格的FIFO;
- 延迟队列(消息在将来某个时间之前不会从队列中取出);
- 优先级;
1.2 选型思路
Netflix 选择 Dynomite,是因为:
- 其具有性能,多数据中心复制和高可用性的特点;
- Dynomite提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;
Netflix选择Redis作为构建队列的存储引擎是因为:
- Redis架构通过提供构建队列所需的数据结构很好地支持了队列设计,同时Redis的性能也非常优秀,具备低延迟的特性;
- Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列;
0x02 总体设计
2.1 系统假设
查询模型:基于Key-Value模型,而不是SQL,即关系模型。存储对象比较小。
ACID属性:传统的关系数据库中,用ACID(A原子性、C一致性、I 隔离性、D持久性)来保证事务,在保证ACID的前提下往往有很差的可用性。Dynamo用弱一致性C来达到高可用,不提供数据隔离 I,只允许单Key更新。
2.2 高可用
其实所有的高可用,是可以依赖于RPC和存储的高可用来实现的。
- 先来看RPC的高可用,比如美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。
- 而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
Netflix 选择 Dynomite,是因为:
- 其具有高性能,多数据中心复制和高可用性的特点;
- Dynomite 提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;
所以 Dyno-queues 的高可用就自动解决了。
2.3 幂等
怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步。
Dynomite 使用 redis 集群这个共享存储 做了幂等保证。
2.4 承载消息堆积
消息到达服务端后,如果不经过任何处理就到接收者,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
Dynomite 使用 redis 集群这个共享存储 在一定程度上缓解了消息堆积问题。
2.5 存储子系统
我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统 > 分布式KV(持久化)> 分布式文件系统 > 数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择。
如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件 索引文件的方式处理。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。
因为 场景是 可靠性要求不那么高,所以 Dynomite 使用 redis 集群这个存储子系统 也是可以的。
2.6 消费关系解析
下一个重要的事情就是解析发送接收关系,进行正确的消息投递了。抛开现象看本质,发送接收关系无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。
一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址 端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如 config server、zookeeper等。维护广播关系所要做的事情基本是一致的:
- 发送关系的维护。
- 发送关系变更时的通知。
本文后续会介绍如何维护发送关系。
2.7 数据分片
数据分片的逻辑既可以实现在客户端,也可以实现在 Proxy
层,取决于你的架构如何设计。
传统的数据库中间件大多将分片逻辑实现在客户端,通过改写物理 SQL
访问不同的 MySQL
库;而在 NewSQL
数据库倡导的计算存储分离架构中,通常将分片逻辑实现在计算层,即 Proxy
层,通过无状态的计算节点转发用户请求到正确的存储节点。
在 Dynomite 之中,队列根据可用区域进行分片,将数据推送到队列时,通过轮训机制确定分片,这种机制可以确保所有分片的数据是平衡的,每个分片都代表Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的组合。
代码语言:javascript复制public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
0x03 Dynomite 特性
3.1 可用分区和机架
Dyno-queues 队列是在 Dynomite 的JAVA客户端 Dyno 之上建立的,Dyno 为持久连接提供连接池,并且可以配置为拓扑感知。关于 Dyno 具体可以参见前文:
源码分析 Dynomite 分布式存储引擎 之 DynoJedisClient(1)](https://cloud.tencent.com/developer/article/1785990)
源码分析 Dynomite 分布式存储引擎 之 DynoJedisClient(2)](https://cloud.tencent.com/developer/article/1795307)
3.1.1 机架
Dyno为应用程序提供特定的本地机架(在AWS中,机架是一个区域,例如 us-east-1a、us-east-1b等),us-east-1a的客户端将连接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种情况下该客户端将进行故障转移。这个属性被用于通过区域划分队列。
3.1.2 分片
队列根据可用区域进行分片,将数据推送到队列时,通过轮训机制确定分片,这种机制可以确保所有分片的数据是平衡的,每个分片都代表Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的组合。
具体机制举例如下:
代码语言:javascript复制public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
3.2 Quorum
在分布式系统中有个CAP理论,对于P(分区容忍性)而言,是实际存在 从而无法避免的。因为分布系统中的处理不是在本机,而是网络中的许多机器相互通信,故网络分区、网络通信故障问题无法避免。因此,只能尽量地在C 和 A 之间寻求平衡。
对于数据存储而言,为了提高可用性(Availability),采用了副本备份,比如对于HDFS,默认每块数据存三份。某数据块所在的机器宕机了,就去该数据块副本所在的机器上读取(从这可以看出,数据分布方式是按“数据块”为单位分布的)
但是问题来了,当需要修改数据时,就需要更新所有的副本数据,这样才能保证数据的一致性(Consistency)。因此,就需要在 C(Consistency) 和 A(Availability) 之间权衡。
而Quorum机制,就是这样的一种权衡机制,一种将“读写转化”的模型。
3.2.1 数据一致性
- 强一致性:在任意时刻,从任意不同副本取出的值都是一样的。
- 弱一致性:有时泛指最终一致性,是指在任意时刻,可能由于网络延迟或者设备异常等原因,不同副本中的值可能会不一样,但经过一段时间后,最终会变成一样。
显然,我们更想要做到强一致性的这种效果,那么有哪些方式可以实现呢,其中最为简单直接的就是 WARO,也就是Write All Read one。
3.2.1.1 WARO 协议
WARO 是一种简单的副本控制协议,当 Client 请求向某副本写数据时(更新数据),只有当所有的副本都更新成功之后,这次写操作才算成功,否则视为失败。这样的话,只需要读任何一个副本上的数据即可。但是WARO带来的影响是写服务的可用性较低,因为只要有一个副本更新失败,此次写操作就视为失败了。
3.2.1.2 Quorum机制
Quorum 的定义如下:假设有 N 个副本,更新操作 wi 在 W 个副本中更新成功之后,则认为此次更新操作 wi 成功,把这次成功提交的更新操作对应的数据叫做:“成功提交的数据”。对于读操作而言,至少需要读 R 个副本,其中,W R>N ,即 W 和 R 有重叠,一般,W R=N 1。
- N = 存储数据副本的数量;
- W = 更新成功所需的副本;
- R = 一次数据对象读取要访问的副本的数量;
Quorum机制认为每次写入的机器数目达到大多数(W)时,就认为本次写操作成功了。即Quorum机制能够不需要更新完全部的数据,但又保证返回给用户的是有效数据的解决方案。
3.2.2 ES 的quorum
我们以 ES 为例。
3.2.2.1 写一致性
我们在发送任何一个增删改操作的时候,都可以带上一个consistency参数,指明我们想要的写一致性是什么。
- one:要求写操作只要primay shard是active可用的,就可以执行;
- all:要求写操作必须所有的shard和replica都是active,才可以执行;
- quorum(默认):所有shard中必须是大部分是可用的(一半及以上),才可以执行;
3.2.2.2 quorum机制
quorum = int((primary shard number_of_replicas)/2) 1
如果节点数少于quorum,可能导致querum不齐全,进而导致无法执行任何写操作。quorum不齐全时,会进行等待。默认等待时间为1分钟,期待活跃的shard数量可以增加,最后实在不行,就会timeout。
3.3 DC_QUORUM
3.3.1 配置
Dynomite 能够将最终一致性(eventual consistency)扩展为协调一致性(tunable consistency)。
关于QUORUM,Dynomite有如下配置:
- DC_ONE 本节点读写入完成及请求完成,其他的rack异步写入。使用DC_ONE模式,读写行为在local Availability Zone(AZ)下是同步的;
- DC_QUORUM 同步写入到指定个数的rack中,其他的节点异步写入。使用DC_QUORUM模式,本地区域特定数量结点下的操作是同步的。
- DC_SAFE_QUORUM 和DC_QUORUM类似,不过这个参数读写都要在指定个数的rack中成功并且数据校验同步,才算请求成功,不然会报错。
由测试得到的结果,Dynomite能从3,6,12,24一路扩展到48个节点,在DC_ONE和DC_QUORUM模式下,吞吐率都能线性地增长。与此同时,Dynomite在延迟方面只增加了很少的开支,即便在DC_QUORUM模式下,(延迟)也只有几毫秒。DC_QUORUM模式在延迟和吞吐量方面处于劣势,但是能为客户提供更好的读写保证。
3.3.2 实现
对于Dyno-queues来说,则是在实现中有所体现。比如在 RedisQueues 中,有如下成员变量:
代码语言:javascript复制private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
在构建 RedisQueues 时,就需要注明使用哪一种。
而从注释中我们可知,
@param quorumConn
Dyno connection with dc_quorum enabled,就是 采用了Quorum的Redis;@param nonQuorumConn
Dyno connection to local Redis,就是本地Redis;
生成 RedisQueues 的代码如下(注意其中注释):
代码语言:javascript复制/**
* @param quorumConn Dyno connection with dc_quorum enabled
* @param nonQuorumConn Dyno connection to local Redis
*/
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
}
3.3.3 使用
在有分片时,就从nonQuorumConn(就是本地Redis)提取。
使用nonQuorumConn来预取的原因是:最终一致性(eventual consistency)。
因为 replication lag,在某一时刻不同分片的数据可能不一样,所以需要先预取。这就需要使用 nonQuorumConn 来预取,因为本地 redis 的数据才是正确的。
代码语言:javascript复制private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}
再比如处理没有 ack 的消息时,先从 nonQuorumConn 读取信息ID,再从 quorumConn 读取消息内容。
这就是因为一致性导致的,所以如下:
代码语言:javascript复制@Override
public void processUnacks() {
execute("processUnacks", keyName, () -> {
Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
for (Tuple unack : unacks) {
double score = unack.getScore();
String member = unack.getElement();
String payload = quorumConn.hget(messageStoreKey, member);
long added_back = quorumConn.zadd(localQueueShard, score, member);
}
});
}
再比如从本地提取消息就使用了 nonQuorumConn。
代码语言:javascript复制@Override
public Message localGet(String messageId) {
try {
return execute("localGet", messageStoreKey, () -> {
String json = nonQuorumConn.hget(messageStoreKey, messageId);
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
再比如 popWithMsgIdHelper 也是先读取 nonQuorumConn,再从 quorumConn 读取其他内容。
代码语言:javascript复制public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {
try {
return execute("popWithMsgId", targetShard, () -> {
String queueShardName = getQueueShardKey(queueName, targetShard);
double unackScore = Long.valueOf(clock.millis() unackTime).doubleValue();
String unackShardName = getUnackKey(queueName, targetShard);
ZAddParams zParams = ZAddParams.zAddParams().nx();
Long exists = nonQuorumConn.zrank(queueShardName, messageId);
// If we get back a null type, then the element doesn't exist.
if (exists == null) {
// We only have a 'warnIfNotExists' check for this call since not all messages are present in
// all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
// we may have hit an inconsistency (because it's in the queue, but other calls have failed),
// so make sure to log those.
monitor.misses.increment();
return null;
}
String json = quorumConn.hget(messageStoreKey, messageId);
if (json == null) {
monitor.misses.increment();
return null;
}
long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
if (added == 0) {
monitor.misses.increment();
return null;
}
long removed = quorumConn.zrem(queueShardName, messageId);
if (removed == 0) {
monitor.misses.increment();
return null;
}
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
0x04 外层封装
RedisQueues是为用户提供的外部接口,从其成员变量可以看出来其内部机制,比如各种策略。
代码语言:javascript复制public class RedisQueues implements Closeable {
private final Clock clock;
private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
private final Set<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final int unackTime;
private final int unackHandlerIntervalInMS;
private final ConcurrentHashMap<String, DynoQueue> queues;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
用户通过get方法来得到DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")
。
public DynoQueue get(String queueName) {
String key = queueName.intern();
return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
.withUnackTime(unackTime)
.withNonQuorumConn(nonQuorumConn)
.withQuorumConn(quorumConn));
}
0x05 数据结构
我们看看 Dyno-queues 中几种数据结构。
5.1 消息结构
一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。
Dyno-queues 只有服务端落地的可靠消息。每个延时消息必须包括以下参数:
- id:唯一标示;
- payload:消息过期之后发送mq的body,提供给消费这做具体的消息处理;
- timeout:延时发送时间;
- priority:优先级,与timeout一起决定消息如何发布,即同一 timeout 时间的消息中,哪个优先使用。
- shard:分区;
public class Message {
private String id;
private String payload;
private long timeout;
private int priority;
private String shard;
}
5.2 存储结构
Dyno-queues 关于存储的总体思路是:hash 记录消息内容, zset 实现按到期时间排序的队列
,即:
- 利用hash 记录消息内容;
- 使用hset存储消息;
- 使用hget提取消息;
- 通过Redis中的zset来实现一个延迟队列,主要利用它的score属性,Redis通过score来为集合中的成员进行从小到大的排序;
- 使用zadd key score1 value1命令生产消息;
- 使用zrem消费消息;
具体逻辑如图,这里的虚线指的是两者通过 msg id 来进行逻辑上的管理,物理没有关联:
代码语言:javascript复制 ---------- ---------- ---------- ----- ----------
| | | | | |
zset | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
| | | | | |
--- ------ ---- ----- ---- ----- ----- ---- -----
| | | |
| | | |
v v v v
--- --- --- --- -- ---- -- --
hash | msg 1 | | msg 2 | | msg 3 | |msg n|
------- ------- ------- -----
具体到代码,则是:
- Message 的id作为key,Message整体被打包成json String作为value:
quorumConn.hset(messageStoreKey, message.getId(), json);
- 用Message 的超时时间,优先级以及当前时间戳构建出zset的分数:
double score = Long.valueOf(clock.millis() message.getTimeout()).doubleValue() priority;
具体参见如下:
代码语言:javascript复制for (Message message : messages) {
String json = om.writeValueAsString(message);
quorumConn.hset(messageStoreKey, message.getId(), json);
double priority = message.getPriority() / 100.0;
double score = Long.valueOf(clock.millis() message.getTimeout()).doubleValue() priority;
String shard = shardingStrategy.getNextShard(allShards, message);
String queueShard = getQueueShardKey(queueName, shard);
quorumConn.zadd(queueShard, score, message.getId());
}
0x06 队列
RedisDynoQueue是 Dyno-queues 延迟队列的主要实现。
6.1 Redis相关
从Redis角度来看,对于每个队列,维护三组Redis数据结构:
- 包含队列元素和分数的有序集合 zset;
- 包含消息内容的Hash集合,其中key为消息ID;
- 包含客户端已经消费但尚未确认的消息有序集合,Un-ack集合 zset;
这三组Redis数据结构在RedisDynoQueue内部其实没有对应的成员变量,对于RedisDynoQueue 来说,看起来是逻辑概念,而事实上它们存在于Redis的内部存储中,由Dynomite负责高可用等等。
具体如下:
代码语言:javascript复制 message list
zset ---------- ---------- ---------- ----- ----------
| | | | | |
| msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 |
| | | | | |
--- ------ ---- ----- ---- ----- ----- ---- -----
| | | |
| | | |
v v v v
hash --- --- --- --- -- ---- -- --
| msg 1 | | msg 2 | | msg 3 | |msg 9|
------- ------- ------- -----
unack list
------------ ------------- --------------
zset | | | |
| msg id 11 | msg id 12 | msg id 13 |
| | | |
------------ ------------- --------------
6.2 成员变量
RedisDynoQueue 的成员变量可以分类如下:
6.2.1 总体
- String queueName:本Queue名字;
- String shardName:分区名字;
6.2.2 Redis连接相关
- JedisCommands quorumConn:采用 quorum 的连接;
- JedisCommands nonQuorumConn:非Quorum的连接;
6.2.3 Redis操作相关
6.2.4 Ack相关
代码语言:javascript复制schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
if (this.singleRingTopology) {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
- boolean singleRingTopology:
6.2.5 监控与统计
QueueMonitor monitor:监控与统计;
6.2.6 具体定义
具体代码如下:
代码语言:javascript复制public class RedisDynoQueue implements DynoQueue {
private final Clock clock;
private final String queueName;
private final List<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final String messageStoreKey;
private final String localQueueShard;
private volatile int unackTime = 60;
private final QueueMonitor monitor;
private final ObjectMapper om;
private volatile JedisCommands quorumConn;
private volatile JedisCommands nonQuorumConn;
private final ConcurrentLinkedQueue<String> prefetchedIds;
private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;
private final ScheduledExecutorService schedulerForUnacksProcessing;
private final int retryCount = 2;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
至此,Dyno-queues 基本功能初步分析完毕,我们下期继续介绍消息产生,消费。
0xFF 参考
干货分享 | 如何从零开始设计一个消息队列
消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列
消息队列设计精要
有赞延迟队列设计
基于Dynomite的分布式延迟队列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源码分析 Dynomite 分布式存储引擎 之 DynoJedisClient(1)](https://blog.csdn.net/weixin_47364682/article/details/113574369)
源码分析 Dynomite 分布式存储引擎 之 DynoJedisClient(2)](https://blog.csdn.net/weixin_47364682/article/details/113574485)
原创 Amazon Dynamo系统架构
Netlix Dynomite性能基准测试,基于AWS和Redis
为什么分布式一定要有延时任务?