Dyno-queues 分布式延迟队列 之 生产消费

2021-03-02 14:29:00 浏览数 (1)

Dyno-queues 分布式延迟队列 之 生产消费

0x00 摘要

本系列我们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,通过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。

0x01 前情回顾

Dyno-queues 是 Netflix 实现的基于 Dynomite 和 Redis 构建的队列。Dynomite是一种通用的实现,可以与许多不同的key-value存储引擎一起使用。目前它提供了对Redis序列化协议(RESP)和Memcached写协议的支持。

上文我们介绍了 Dyno-queues 分布式延迟队列 的设计思路,本文我们继续介绍消息的产生和消费。

首先我们回顾下设计目标和选型思路。

1.1 设计目标

具体设计目标依据业务系统不同而不同。

Dyno-queues的业务背景是:在Netflix的平台上运行着许多的业务流程,这些流程的任务是通过异步编排进行驱动,现在要实现一个分布式延迟队列,这个延迟队列具有如下特点:

  • 分布式;
  • 不用外部的锁机制;
  • 高并发;
  • 至少一次语义交付;
  • 不遵循严格的FIFO;
  • 延迟队列(消息在将来某个时间之前不会从队列中取出);
  • 优先级;

1.2 选型思路

Netflix选择Dynomite,是因为:

  • 其具有性能,多数据中心复制和高可用性的特点;
  • Dynomite提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;

Netflix选择Redis作为构建队列的存储引擎是因为:

  • Redis架构通过提供构建队列所需的数据结构很好地支持了队列设计,同时Redis的性能也非常优秀,具备低延迟的特性
  • Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列。

下面我们具体看看如何生产和消费消息。

0x02 产生消息

Dyno-queues 用户使用push方法来向redis中投放消息。

代码语言:javascript复制
List<Message> payloads = new ArrayList<>();
payloads.add(new Message("id1", "searchable payload123"));
payloads.add(new Message("id2", "payload 2"));

DynoQueue V1Queue = queues.get("simpleQueue");

// Clear the queue in case the server already has the above key.
V1Queue.clear();

// Test push() API
List pushed_msgs = V1Queue.push(payloads);

push逻辑为:

  • 根据消息超时(延迟队列)和优先级计算得分;
  • 添加到队列的有序集合;
  • 将Message对象到Hash集合中,key是messageId;

具体代码如下:

代码语言:javascript复制
@Override
public List<String> push(final List<Message> messages) {
    Stopwatch sw = monitor.start(monitor.push, messages.size());
    try {
        execute("push", "(a shard in) "   queueName, () -> {
            for (Message message : messages) {
                String json = om.writeValueAsString(message);
                
                quorumConn.hset(messageStoreKey, message.getId(), json);
                
                double priority = message.getPriority() / 100.0;
                double score = Long.valueOf(clock.millis()   message.getTimeout()).doubleValue()   priority;
                String shard = shardingStrategy.getNextShard(allShards, message);
                String queueShard = getQueueShardKey(queueName, shard);
                
                quorumConn.zadd(queueShard, score, message.getId());
            }
            return messages;
        });
        return messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
    } finally {
        sw.stop();
    }
}

2.1 设置超时

setTimeout 函数 用来重新设置超时时间。

配置超时时间的目的是 :事件在消息队列中的排列顺序是由一个算法确定的,如果超时时间越近,则这个事件在 zset 的score 越低,就越可能被优先分发:

double score = Long.valueOf(clock.millis() message.getTimeout()).doubleValue() priority;

具体代码如下:

代码语言:javascript复制
public boolean setTimeout(String messageId, long timeout) {

    return execute("setTimeout", "(a shard in) "   queueName, () -> {

        String json = nonQuorumConn.hget(messageStoreKey, messageId);
        if (json == null) {
            return false;
        }
        Message message = om.readValue(json, Message.class);
        message.setTimeout(timeout);

        for (String shard : allShards) {

            String queueShard = getQueueShardKey(queueName, shard);
            Double score = quorumConn.zscore(queueShard, messageId);
            if (score != null) {
                double priorityd = message.getPriority() / 100;
                double newScore = Long.valueOf(clock.millis()   timeout).doubleValue()   priorityd;
                ZAddParams params = ZAddParams.zAddParams().xx();
                quorumConn.zadd(queueShard, newScore, messageId, params);
                json = om.writeValueAsString(message);
                quorumConn.hset(messageStoreKey, message.getId(), json);
                return true;
            }
        }
        return false;
    });
}

2.2 设定优先级

Dyno-queues 的消息是有优先级的,具体代码如下:

代码语言:javascript复制
    /**
     * Sets the message priority.  Higher priority message is retrieved ahead of lower priority ones
     * @param priority priority for the message.
     */
    public void setPriority(int priority) {
        if (priority < 0 || priority > 99) {
            throw new IllegalArgumentException("priority MUST be between 0 and 99 (inclusive)");
        }
        this.priority = priority;
    }

优先级是在 0 和 99 之间,而且越大优先级越高。这里就有一个疑问:

double score = Long.valueOf(clock.millis() message.getTimeout()).doubleValue() priority;

因为使用的是zset,score 是 timeout priority,而 zset 的score应该是低的优先获取。为什么 priority 反而是高的优先处理?

看代码你会发现,priority 最后被转换成一个小数。而score的整数部分是时间戳。

即 score = timestamp Priority() / 100。

代码语言:javascript复制
double priorityd = message.getPriority() / 100;

double newScore = Long.valueOf(clock.millis()   timeout).doubleValue()   priorityd;

ZAddParams params = ZAddParams.zAddParams().xx();

quorumConn.zadd(queueShard, newScore, messageId, params);

这样如果用户指定 "时间戳 1" 为截止时间来获取,则同一个时间戳的消息会被一并取出。

具体这些同一个时间戳的消息如何内部再排优先级?就可以按照小数点之后的数字排序,这样就能保证同一个时间戳的消息内部,按照优先级排序。这个排序是用户自己在user function做的

0x03 消费消息

有几个不同的消费方法,举例如下:

代码语言:javascript复制
Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false);

V1Queue.popWithMsgId(payloads.get(i).getId());

V1Queue.unsafePopWithMsgIdAllShards(payloads.get(i).getId());

List<Message> popped_msgs = V1Queue.pop(1, 1000, TimeUnit.MILLISECONDS);

List<Message> pop_all_msgs = V1Queue.unsafePopAllShards(7, 1000, TimeUnit.MILLISECONDS);

我们以pop为例说明。

3.1 预取

Dyno-queues 使用了预取来完成消费。预取是因为如注释所示:

代码语言:javascript复制
    // We prefetch message IDs here first before attempting to pop them off the sorted set.
    // The reason we do this (as opposed to just popping from the head of the sorted set),
    // is that due to the eventually consistent nature of Dynomite, the different replicas of the same
    // sorted set _may_ not look exactly the same at any given time, i.e. they may have a different number of
    // items due to replication lag.
    // So, we first peek into the sorted set to find the list of message IDs that we know for sure are
    // replicated across all replicas and then attempt to pop them based on those message IDs.

大致为:

  • 预取是因为最终一致性(eventual consistency)。
  • 因为replication lag,在某一时刻不同分片的数据可能不一样,所以需要先预取。

这就需要使用nonQuorumConn来预取,因为本地redis的数据才是正确的。

  • @param quorumConn Dyno connection with dc_quorum enabled,就是采用了Quorum的Redis;
  • @param nonQuorumConn Dyno connection to local Redis,就是本地Redis;

预取如下:

代码语言:javascript复制
/**
 * Prefetch message IDs from the local shard.
 */
private void prefetchIds() {
    double now = Long.valueOf(clock.millis()   1).doubleValue();
    int numPrefetched = doPrefetchIdsHelper(localQueueShard, numIdsToPrefetch, prefetchedIds, now);
    if (numPrefetched == 0) {
        numIdsToPrefetch.set(0);
    }
}

这里可以看到,使用了nonQuorumConn。

代码语言:javascript复制
private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
    return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}

3.2 实际操作

实际操作是在_pop中完成。

_pop的逻辑如下:

  • 计算当前时间为最大分数。
  • 获取分数在0和最大分数之间的消息。
  • 将messageID添加到unack集合中,并从队列的有序集中删除这个messageID。
  • 如果上一步成功,则根据messageID从Redis集合中检索消息。

具体代码如下:

代码语言:javascript复制
private List<Message> _pop(String shard, int messageCount,
                           ConcurrentLinkedQueue<String> prefetchedIdQueue) {
    String queueShardName = getQueueShardKey(queueName, shard);
    String unackShardName = getUnackKey(queueName, shard);
    double unackScore = Long.valueOf(clock.millis()   unackTime).doubleValue();

    // NX option indicates add only if it doesn't exist.
    // https://redis.io/commands/zadd#zadd-options-redis-302-or-greater
    ZAddParams zParams = ZAddParams.zAddParams().nx();

    List<Message> popped = new LinkedList<>();
    for (;popped.size() != messageCount;) {
        String msgId = prefetchedIdQueue.poll();
        if(msgId == null) {
            break;
        }

        long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
        if(added == 0){
            monitor.misses.increment();
            continue;
        }

        long removed = quorumConn.zrem(queueShardName, msgId);
        if (removed == 0) {
            monitor.misses.increment();
            continue;
        }

        String json = quorumConn.hget(messageStoreKey, msgId);
        if (json == null) {
            monitor.misses.increment();
            continue;
        }
        Message msg = om.readValue(json, Message.class);
        popped.add(msg);

        if (popped.size() == messageCount) {
            return popped;
        }
    }
    return popped;
}

0x4 即时消费

虽然是延迟队列,一般来说,无论是使用方或者作者,都希望对于消息,可以做到即时消费。

但是对于即时消费,Dyno-queues 并没有做太多保证。

4.1 阻塞式弹出

即时消费,网上所流传的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。 让我们来看看阻塞式弹出的使用方式:

代码语言:javascript复制
BRPOP key [key ...] timeout

此命令的说明是:

1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 2、当给定多个key参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。

另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。

以此来看,列表的阻塞式弹出有两个特点:

1、如果list中没有任务的时候,该连接将会被阻塞 2、连接的阻塞有一个超时时间,当超时时间设置为0时,即可无限等待,直到弹出消息

由此看来,此方式是可行的。

但此为传统的观察者模式,业务简单则可使用,如A的任务只由B去执行。但如果A和Z的任务,B和C都能执行,那使用这种方式就相形见肘。这个时候就应该使用订阅/发布模式,使业务系统更加清晰。

4.2 超时处理

Dyno-queues 并没有利用列表的阻塞式弹出机制,而是使用了超时处理,保证在一定时间内尽量完成操作。

代码语言:javascript复制
List<Message> popped_msgs = V1Queue.pop(1, 1000, TimeUnit.MILLISECONDS);

List<Message> pop_all_msgs = V1Queue.unsafePopAllShards(7, 1000, TimeUnit.MILLISECONDS);

具体代码如下:

代码语言:javascript复制
@Override
public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        long start = clock.millis();
        long waitFor = unit.toMillis(wait);
        numIdsToPrefetch.addAndGet(messageCount);
        prefetchIds();
        while (prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) {
            // 会等待,然后再次尝试
            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
            prefetchIds();
        }
        return _pop(shardName, messageCount, prefetchedIds);

0xFF 参考

干货分享 | 如何从零开始设计一个消息队列

消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列

消息队列设计精要

有赞延迟队列设计

基于Dynomite的分布式延迟队列

http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

http://activemq.apache.org/delay-and-schedule-message-delivery.html

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)

原创 Amazon Dynamo系统架构

Netlix Dynomite性能基准测试,基于AWS和Redis

为什么分布式一定要有延时任务?

0 人点赞