本文代码: DelayQueue
延迟队列,想必大家都不陌生,顾名思义,它是一个带有延迟功能的队列。那么到底为什么需要延迟,怎么延迟呢?考虑一下下面的应用场景。
- 订单三十分钟未支付,就自动作废。
- 新用户注册之后的一天三天等时间点发送推广邮件。
- 淘宝京东等的订单完成后 5 天未评价,自动好评。
在这些场景下,比较粗暴的办法就是定时扫表,然后拿到相应的信息,去做业务操作。
或者可以使用延时队列,在触发的时候生产信息及触发时间到队列中,在另外一个进程/线程轮询队列,按照当前时间进行弹出,不断的消费即可实现定时执行任务。
Redis 的有序列表数据类型,可以说是作为延时队列极其优秀的一个载体,因此被很多公司采用。今天就实现一个基本的延时队列,暴露对应的方法出来。
为什么叫基本的延时队列呢,因为本文是侧重于 Redis 的封装的,所以对于消息队列注重的很多特性没有实现,比如消息的 ACK, 以及失败重试等
目录
- 设计
- Java 实现代码
- Java 代码测试
- 服务化
- 参考文章
- 联系我
设计
延迟队列如果设计的足够通用及”豪华版”, 是可以单独作为一个中间件服务的,独立于业务运行,提供对应的接口出来即可。但是本文不实现服务级别的延迟队列,仅在后文简单介绍一下(因为本文是 Redis 系列,而不是延迟队列系列).
本文对 Redis 进行简单封装,提供一个DelayQueue
类出来使用。
作为一个延迟队列,那么它需要有以下的功能:
- 放入任务
- 取出任务(去做)
- 删除任务(不做了)
- 计数功能
对应于 Redis 怎么实现呢?Sorted Set帮你搞定。
我们将序列化后的任务信息作为 member, 任务触发时间作为 score. 放入Sorted Set即可。
之后不断弹出分值最小的元素,就是下一个要执行的任务。
功能 | 命令 |
---|---|
放入任务 | ZADD 命令 |
取出任务(去做) | ZREVRANGEBYSCORE 命令 ZREM 命令 |
删除任务(不做了) | ZREM 命令 |
计数功能 | ZCOUNT 命令 |
Java 实现代码
代码语言:javascript复制package com.huyan.collection;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;
import java.util.*;
/**
* Author: huyanshi
* Date: 2020/01/22.
* Brief: redis 实现的延迟队列 实现
*/
@Slf4j
public class DelayQueue {
/**
* 延迟队列的 key
*/
private final String key;
/**
* Jedispool
*/
private final JedisPool jedisPool;
/**
* constructor
*
* @param key key
* @param host host
*/
public DelayQueue(String key, String host) {
this.key = key;
this.jedisPool = new JedisPool(host);
}
/**
* constructor
*
* @param key key
* @param jedisPool jedispool
*/
public DelayQueue(String key, JedisPool jedisPool) {
this.key = key;
this.jedisPool = jedisPool;
}
/**
* 获取当前延迟队列中元素的数量
*
* @return 数量
*/
public long getDelaySize() {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.zcount(key, 0, Long.MAX_VALUE);
}
}
/**
* 向延迟队列中添加一个元素
*
* @param expireTs 元素的执行时间
* @param member 元素的信息体。
*/
public void putDelay(int expireTs, String member) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zadd(key, expireTs, member);
}
}
/**
* 删除元素
*
* @param members 元素名的集合
*/
public void delDelay(String... members) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zrem(key, members);
}
}
/**
* 批量添加元素
*
* @param items 待添加的所有元素
*/
public void putDelay(List<Item> items) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();
List<Response<Long>> resp = new ArrayList<>(items.size());
for (Item item : items) {
resp.add(pipeline.zadd(key, item.expireTs, item.value));
}
pipeline.sync();
int err = 0;
for (Response<Long> r : resp) {
Long reply = r.get();
if (reply == null) {
err = 1;
}
}
if (err > 0) {
log.warn("put delays err: {}", err);
}
}
}
/**
* 弹出当前要执行的任务
*
* @return 当前要执行的任务
*/
public Set<Tuple> popNowExpires() {
int now = (int) (System.currentTimeMillis() / 1000);
return popRangeExpires(now);
}
/**
* 弹出某个时间前执行的任务
*
* @return 当前要执行的任务
*/
public Set<Tuple> popRangeExpires(int expireTs) {
Set<Tuple> values = rangeExpires(expireTs);
// del
if (values.size() > 0) {
delDelay(values.stream().map(Tuple::getElement).toArray(String[]::new));
}
return values;
}
/**
* 查看某个时间以前的任务
*
* @param expireTs 执行时间
* @return 任务集合
*/
public Set<Tuple> rangeExpires(int expireTs) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.zrevrangeByScoreWithScores(key, expireTs, 0);
}
}
/**
* 根据过期时间批量移除元素
*
* @param start 开始时间
* @param end 结束时间
*/
public void remove(int start, int end) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zremrangeByRank(key, start, end);
}
}
/**
* 延迟队列中放的 Item
*/
public static class Item {
public final String value;
public final int expireTs;
public Item(String value, int expireTs) {
this.value = value;
this.expireTs = expireTs;
}
@Override
public String toString() {
return value ":" expireTs;
}
}
}
代码比较简单,这里就不多说了,上面的功能,对应的 API 为:
功能 | 命令 | API |
---|---|---|
放入任务 | ZADD 命令 | putDelay |
取出任务(去做) | ZREVRANGEBYSCORE 命令 ZREM 命令 | popNowExpires |
删除任务(不做了) | ZREM 命令 | delDelay |
计数功能 | ZCOUNT 命令 | getDelaySize |
同时,为了方便多个值一起操作,提供了一些批量操作的 API.
Java 代码测试
首先我们要测试可用性。
代码语言:javascript复制 @Test
public void deleyQueueTest() {
int oneHourLater = (int) (System.currentTimeMillis() / 1000 3600);
queue.putDelay(oneHourLater, "test_1");
Assert.assertEquals(1, queue.getDelaySize());
int twoHourLater = (int) (System.currentTimeMillis() / 1000 7200);
queue.putDelay(twoHourLater, "test_2");
Assert.assertEquals(2, queue.getDelaySize());
queue.popNowExpires();
Assert.assertEquals(2, queue.getDelaySize());
queue.rangeExpires(oneHourLater 100);
Assert.assertEquals(2, queue.getDelaySize());
queue.delDelay("test_2");
Assert.assertEquals(1, queue.getDelaySize());
queue.popRangeExpires(oneHourLater 100);
Assert.assertEquals(0, queue.getDelaySize());
}
可以看到,实现是没有问题的。从上面的测试代码大概可以看出这个消息队列的使用方式了,这里我还是提供一个简单的生产消费代码出来:
生产者:
代码语言:javascript复制 @Test
public void delayQueueProducer() {
// 单个生产
int now = (int) (System.currentTimeMillis() / 1000);
queue.putDelay(now, "your_message_body");
// 批量生产
List<DelayQueue.Item> items = new ArrayList<>();
items.add(new DelayQueue.Item("your_message_body", now));
queue.putDelay(items);
}
消费者:
代码语言:javascript复制 @Test
public void delayQueueConsumer() throws InterruptedException {
// 轮询消费当前应该执行的任务,或者调用 popRangeExpires 消费某个时间之前的所有任务
int now = (int) (System.currentTimeMillis() / 1000);
while (true) {
Set<Tuple> tuples = queue.popNowExpires();
// 为空休眠一秒
if (CollectionUtils.isEmpty(tuples)) {
Thread.sleep(1000);
continue;
}
// 处理业务逻辑
System.out.println("do something");
}
}
服务化
经常用延时队列的读者可能从上面的代码里发现了一个问题,那就是还是有公用逻辑的,比如在消费者端的这个循环。
代码语言:javascript复制 while (true) {
Set<Tuple> tuples = queue.popNowExpires();
// 为空休眠一秒
if (CollectionUtils.isEmpty(tuples)) {
Thread.sleep(1000);
continue;
}
这个循环其实也可以放在延时队列内部,但是因为我们只是封装了一个类,而不是一个服务,所以提供这种轮询不方便。
想要更加通用化,那么封装一个类就已经没有用了,需要将 延时队列
做成中间件,也就是服务化。
基本原理就是:
启动一个服务,内部负责维护延时队列,负责轮询延时队列,之后将多个业务方的定时任务进行分发,然后由业务方消费到进行逻辑处理。
当然,如果用到延时队列的地方不多,或者说不是提供给多个业务方/业务组来使用,是没有必要搞这么大阵势的.
对于服务化的延时队列,其核心对 Redis 的使用和本文也基本一致,只是会额外添加许多其他功能,比如支持多个业务方,支持任务分发,支持任务 ACK 以及失败重试等。
这些添加的内容,都不是本文的重点,因此本文不做讲解了。仅推荐一些学习内容。
有赞的一篇关于 延时队列服务的文章,讲解的不错,同时网上也有根据这篇文章的思路实现的具体代码,因此在这里作为学习资料推荐给大家。
有赞延迟队列设计
上文的 go 语言实现
上文的 java 语言实现
代码我大概看了一眼,不错而且挺简单明了的。十分不错的入门学习内容。
参考文章
https://tech.youzan.com/queuing_delay/
完。