Redis实现延迟队列

2022-05-05 15:40:38 浏览数 (1)

什么是延迟队列

所谓的延迟队列就是,生产者的消息推送到队列中,消费者不会马上消费,而是到了设置的指定的时间才消费。可以采用Redis的zset来实现。将消息序列化成一个字符串作为zset的value。这个消息的到期处理时间作为score,然后用一个线程去轮询zset到期的任务处理,建议使用多线程,为了保障任务消费的可用性。不过多线程就要考虑并发抢任务。

废话不多说直接上代码。

代码逻辑

将任务放到队列中,设置延迟时间

代码语言:javascript复制
/**
 * 任务放入队列中
 * @param msg    任务信息
 * @param afterTime   延迟时间
 */
public void delay(T msg, long afterTime) {
    TaskItem<T> task = new TaskItem<>();
    //随机生成的任务id
    task.id = UUID.randomUUID().toString();
    task.msg = msg;

    //这里将TaskItem转换成JSON格式
    String taskStr = JSON.toJSONString(task);
    Date now = new Date();
    log.info(">>>>>>>>>>开始将任务放入到zset中,当前时间:{}", now);
    //采用zset存入
    redisTemplate.opsForZSet().add(queueKey, taskStr, now.getTime()   afterTime);
}

消费代码

代码语言:javascript复制
public void loop() {
    while (!Thread.interrupted()) {
        Date now = new Date();
        //从zset中获取score为当前时间的数据
        Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now.getTime(), 0, 1);
        //如果队列为空,sleep一下再重新循环,sleep是为了缓解redis压力
        if (values.isEmpty()) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        //不为空,获取值
        String next = values.iterator().next();
        //将redis中的值删掉,相当于被消费掉
        Long remove = redisTemplate.opsForZSet().remove(queueKey, next);
        if ((remove > 0)) {
            //Json 转 Object
            TaskItem<T> taskItem = JSON.parseObject(next, taskType);
            //将msg信息作处理
            this.handleMsg(taskItem.msg, now);
        }
    }
}

private void handleMsg(T msg, Date now) {
    log.info("延迟消费到数据:[{}],当前时间:[{}]", msg, now);
    System.out.println(String.format("延迟消费到数据:[%s]", msg));
}
测试
代码语言:javascript复制
//延迟队列测试
@Test
public void testDelayQueue() {
    String queueKey = "redis_delay_queue";
    delayingQueue.setQueueKey(queueKey);
    Thread producer = new Thread(() -> delayingQueue.delay("I am Lvshe", 10000));

    Thread consumer = new Thread(() -> delayingQueue.loop());

    producer.start();
    consumer.start();

    try {
        producer.join();

        Thread.sleep(20000);
        //线程中断标记,跳出loop()方法循环
        consumer.interrupt();
        consumer.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

如上代码,生产了一个数据 “I am Lvshen”,要求10s 后消费

结果如下图:

0 人点赞