基于RabbitMq的实现消息延时发送的优点以及其局限性;

2021-12-24 08:55:09 浏览数 (1)

我们消息中心是负责承载各个业务(比如电商,物流,营销中心,券中心,会员中心,积分中心,停车场等等)的消息发送需求,那么消息呢就可能有延迟需求,比如物流到货后十五分钟进行一次邀请评价的需求。而且做个消息延迟发送,我们不可能让业务自己去写消息啥时候发送,到发送时间了再调我们接口这种逻辑,这样不合理,我们需要做比较强大的消息中心功能。

那么延迟消息的实现有多种多样的实现,我们前阵子想实现延迟消息,对此做了一定的讨论,最后发现许多方案要么不支持分布式项目,要么平白的对机器性能损耗比较大,要么可能存在系统崩溃数据丢失的风险,最后我们采用了RabbitMQ的方式实现延迟队列。

一. rabbitmq的延迟消息实现方式

1.死信队列方式
1.1我先大白话解释一下啥叫死信队列:
  • 首先死信队列是普通队列
  • 死信队列是在其他队列里的消息死亡后进入的队列
  • 死信队列本身不具有死信功能,需要绑定
  • 比如A绑定了死信队列是B,那么A中死亡的消息就会进入B内,B就被称之为死信队列

上面提到的消息死亡有几种类型 消息被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度

1.2延迟队列 死信队列实现延迟消息发送

RabbitMQ支持给队列内的消息设置过期时间和给消息单独过期时间,那么结合死信队列我们就可以做到消息的延迟发送了; 大概是以下步骤 1.创建延迟队列并设置消息的过期时间,绑定一个死信队列 2.不创建该队列的消费者,让其内部消息根据过期时间自动过期 3.创建死信队列的消费者,使其每次消费死亡的消息;

死信队列结构图

看到之前有的人写的博文写的比较复杂,还把交换机写进来了,其实完全没必要,死信队列根本上只是队列之间的绑定以及数据交换,具体代码就不说了,因为重点不在这里;

死信这种方式有个致命的缺点,导致我们这边无法使用:
  • 1.时序问题:如果我们消息使用的是同一个队列,然后我们给消息本身设置过期时间,那么同一个队列中消息消费是按顺序来的,而不是过期时间,也就说说如果我们正常队列有两个数据A ttl15秒 B ttl 3秒,A在队列前面B在后面;那么我们消费的时候及时B过期时间更短,我们也不会先消费B而是会先消费A,因为同一队列有顺序问题。
  • 2.过多队列问题: 前面说了如果单个队列那么消费就有顺序问题,那我们可以按过期时间分别绑定多个队列啊。 但是如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,我们业务方指定的时间是无法确定的,不可能去限制业务方让他们只能某个时间发,那么我们要创建无数个队列??显然这是不可能的;

因此这种方式直接pass了;

2.借助rabbitmq的延迟插件
2.1 延迟队列插件rabbitmq_delayed_message_exchange的安装
  • 1.首先去RabbitMQ 插件下载网站下载自己版本对应的ez文件类型插件
  • 2.插件rabbitmq_delayed_message_exchange下载完放入rabbitmq 的plugins文件下
  • 3.进入到rabbit文件的sbin目录,执行
代码语言:javascript复制
#查看插件目录
rabbitmq-plugins list
#安装延迟队列插件 (rabbitmq-plugins enable 插件名)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.2 RabbitMq插件实现消息延迟的优点;

这个优点就是解决了上面方式的缺点。。。让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间;

2.3 实现方式;
  • 1.创建一个delay类型的exchange,绑定对应的队列
  • 2.这里delay类型和direct topic fanout等消息路由方式相对独立,也就是说我们可以和之前一样,指定exchange是direct还是topic还是fanout都可以。根本上来说:我们指定其是delay类型不过是决定了其什么时候投递到指定路由队列
  • 3.绑定路由队列进行消费

我们目前有两种不同类型的延迟,因此我这里用的topic模式结合延迟插件实现延迟队列;

topic结合delay插件实现延迟消息架构

如上所示,我们延迟的消息首先都放置到Mq里,然后延迟时间到了之后呢就会被路由到指定队列上去;

这么做有个小问题,如果我们延迟消息过多的话,那么必然存在着rabbitmq挤压消息,占用空间的问题,当然解决方案也比较简单

  • 延迟不超过一天的我们直接进入rabbitmq
  • 把延迟超过第一天的消息先进入mysql,每天定时扫第二天要发的数据,扫进mq里
  • 这样的话RabbitMQ就做到了只存储当天消息的能力;

如果我们消息非常非常多,可以把消息分发区间划的更细点,比如只存储每12小时的消息,甚至只存储每个小时要发送的消息,这都是完全OK的;

代码也很简单,这里提供一个绑定了两种业务的延迟队列的小demo: 延迟队列配置:

代码语言:javascript复制
@Configuration
public class DelayedRabbitMQConfig {
  /**
     * 声明延时队列hangfire
     * 不设置TTL
     */
    @Bean("hDelayQueue")
    public Queue hangfireDelayQueue() {
        return new Queue(BaseDict.DELAY_QUEUE_NAME_H);
    }


    /**
     * 业务调用延迟队列
     *
     */
    @Bean("mDelayQueue")
    public Queue moduleDelayQueue() {
        return new Queue(BaseDict.DELAY_QUEUE_NAME_M);
    }


    /**
     * 延迟交换机
     *
     */
    @Bean(name = "delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-delayed-type", "topic");
        return new CustomExchange(BaseDict.DELAY_EXCHANGE_DISPATCHER, "x-delayed-message", true, false, args);
    }


    @Bean
    public Binding hangfireBinding(@Qualifier("hDelayQueue") Queue queue,
                                   @Qualifier("delayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_H).noargs();
    }


    @Bean
    public Binding moduleBinding(@Qualifier("mDelayQueue") Queue queue,
                                 @Qualifier("delayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_M).noargs();
    }
}

消息生产

代码语言:javascript复制
    @PostMapping("addMsgToMDelayQueue")
    public void addMsgToMQueue(@RequestBody XXX request) {
        sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_M, JSONObject.toJSONString(request), 10*24*60*60*1000);
    }



    @PostMapping("addMsgToHDelay")
    public void addMsgToHQueue(@RequestBody List<AAA> request) {
        sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_H, JSONObject.toJSONString(request), 10*24*60*60*1000);
    }


    /**
     * 消息发送到延迟交换机上
     */
    public void sendDelayMsg(String exchange,String routingKey, String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, a -> {
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }

消息消费

代码语言:javascript复制
@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_H, concurrency = "10-30")
    public void hangfireReceive(Message message, Channel channel) throws IOException {
        final List<AAA> sendDetails = JSONObject.parseArray(JSONObject.toJSONString(message.getBody()), AAA.class);
        Optional.ofNullable(sendDetails).ifPresent(item->item.forEach(System.out::println));
    }


    @RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_M, concurrency = "10-20")
    public void receiveQ(Message message, Channel channel) throws IOException {
        final XXX request = JSONObject.parseObject(message.getBody(), XXX.class);
        System.out.println("消费" request.toString());
    }
3.基于RabbitMQ延迟插件实现延迟消息的局限性

我们在第一次使用这个延迟插件的时候做了一个压测,大约100W数据量的延迟会导致内存和Cpu使用量的急速上升,查了一些文档没搞明白后,去了官网看了下,发现其对此有以下解释,大致是讲目前这个延迟插件还不足以支持那么大的数据量,建议数据量不要太大 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

因此我们如果想用好延迟插件,目前来说需要做一些额外的配合,尽量使其延时最近的数据,并且数据量维持到一个比较低的程度

0 人点赞