什么是延迟队列
所谓的延迟队列就是,生产者的消息推送到队列中,消费者不会马上消费,而是到了设置的指定的时间才消费。可以采用Redis的zset来实现。将消息序列化成一个字符串作为zset的value。这个消息的到期处理时间作为score,然后用一个线程去轮询zset到期的任务处理,建议使用多线程,为了保障任务消费的可用性。不过多线程就要考虑并发抢任务。
废话不多说直接上代码。
代码逻辑
将任务放到队列中,设置延迟时间
代码语言:javascript复制/**
* 任务放入队列中
* @param msg 任务信息
* @param afterTime 延迟时间
*/
public void delay(T msg, long afterTime) {
TaskItem<T> task = new TaskItem<>();
//随机生成的任务id
task.id = UUID.randomUUID().toString();
task.msg = msg;
//这里将TaskItem转换成JSON格式
String taskStr = JSON.toJSONString(task);
Date now = new Date();
log.info(">>>>>>>>>>开始将任务放入到zset中,当前时间:{}", now);
//采用zset存入
redisTemplate.opsForZSet().add(queueKey, taskStr, now.getTime() afterTime);
}
消费代码
代码语言:javascript复制public void loop() {
while (!Thread.interrupted()) {
Date now = new Date();
//从zset中获取score为当前时间的数据
Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now.getTime(), 0, 1);
//如果队列为空,sleep一下再重新循环,sleep是为了缓解redis压力
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//不为空,获取值
String next = values.iterator().next();
//将redis中的值删掉,相当于被消费掉
Long remove = redisTemplate.opsForZSet().remove(queueKey, next);
if ((remove > 0)) {
//Json 转 Object
TaskItem<T> taskItem = JSON.parseObject(next, taskType);
//将msg信息作处理
this.handleMsg(taskItem.msg, now);
}
}
}
private void handleMsg(T msg, Date now) {
log.info("延迟消费到数据:[{}],当前时间:[{}]", msg, now);
System.out.println(String.format("延迟消费到数据:[%s]", msg));
}
测试
代码语言:javascript复制//延迟队列测试
@Test
public void testDelayQueue() {
String queueKey = "redis_delay_queue";
delayingQueue.setQueueKey(queueKey);
Thread producer = new Thread(() -> delayingQueue.delay("I am Lvshe", 10000));
Thread consumer = new Thread(() -> delayingQueue.loop());
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(20000);
//线程中断标记,跳出loop()方法循环
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
如上代码,生产了一个数据 “I am Lvshen”,要求10s 后消费
结果如下图: