redis实现简单延时队列

2022-01-04 16:25:56 浏览数 (1)

继之前用rabbitMQ实现延时队列,Redis由于其自身的Zset数据结构,也同样可以实现延时的操作

代码语言:txt复制
Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。
代码语言:txt复制
Zset的排列效果如下图:

java代码实现如下:

代码语言:javascript复制
package cn.chinotan.service.delayQueueRedis;

import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @program: test
 * @description: redis实现延时队列
 * @author: xingcheng
 * @create: 2018-08-19
 **/
public class AppTest {

    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
    private static CountDownLatch cdl = new CountDownLatch(10);

    public static Jedis getJedis() {
        return jedisPool.getResource();
    }

    /**
     * 生产者,生成5个订单
     */
    public void productionDelayMessage() {
        for (int i = 0; i < 5; i  ) {
            Calendar instance = Calendar.getInstance();
            // 3秒后执行
            instance.add(Calendar.SECOND, 3   i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i   1));
            System.out.println("生产订单: "   StringUtils.join("000000000", i   1)   " 当前时间:"   new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3   i)   "秒后执行");
        }
    }

    //消费者,取订单
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();
        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
            if (order == null || order.isEmpty()) {
                System.out.println("当前没有等待的任务");
                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];
            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();
            long nowTime = instance.getTimeInMillis() / 1000;
            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);
                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())   ":redis消费了一个任务:消费的订单OrderId为"   element);
                }
            }
        }
    }

    static class DelayMessage implements Runnable{
        @Override
        public void run() {
            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();
        for (int i = 0; i < 10; i  ) {
            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

实现效果如下:

生产环境使用注意:

由于这种实现方式简单,但在生产环境下大多是多实例部署,所以存在并发问题,即缓存的查找和删除不具有原子性(zrangeWithScores和zrem操作不是一个命令,不具有原子性),会导致消息的多次发送问题,这个问题的避免方法如下:

1.可以采用单独一个实例部署解决(不具备高可用特性,容易单机出现故障后消息不能及时发送)

2.采用redis的lua脚本进行原子操作,即原子操作查找和删除(实现难度大)

因此,延时队列的实现最好采用rabbitMQ来实现,rabbitMQ天然具备分布式的特性,可以很好的用在多服务,多实例环境下,具体的实现参考我的第一篇博客https://cloud.tencent.com/developer/article/1409940

0 人点赞