我们消息中心是负责承载各个业务(比如电商,物流,营销中心,券中心,会员中心,积分中心,停车场等等)的消息发送需求,那么消息呢就可能有延迟需求,比如物流到货后十五分钟进行一次邀请评价的需求。而且做个消息延迟发送,我们不可能让业务自己去写消息啥时候发送,到发送时间了再调我们接口这种逻辑,这样不合理,我们需要做比较强大的消息中心功能。
那么延迟消息的实现有多种多样的实现,我们前阵子想实现延迟消息,对此做了一定的讨论,最后发现许多方案要么不支持分布式项目,要么平白的对机器性能损耗比较大,要么可能存在系统崩溃数据丢失的风险,最后我们采用了RabbitMQ的方式实现延迟队列。
一. rabbitmq的延迟消息实现方式
1.死信队列方式
1.1我先大白话解释一下啥叫死信队列:
- 首先死信队列是普通队列
- 死信队列是在其他队列里的消息死亡后进入的队列
- 死信队列本身不具有死信功能,需要绑定
- 比如A绑定了死信队列是B,那么A中死亡的消息就会进入B内,B就被称之为死信队列
上面提到的消息死亡有几种类型 消息被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度
1.2延迟队列 死信队列实现延迟消息发送
RabbitMQ支持给队列内的消息设置过期时间和给消息单独过期时间,那么结合死信队列我们就可以做到消息的延迟发送了; 大概是以下步骤 1.创建延迟队列并设置消息的过期时间,绑定一个死信队列 2.不创建该队列的消费者,让其内部消息根据过期时间自动过期 3.创建死信队列的消费者,使其每次消费死亡的消息;
死信队列结构图
看到之前有的人写的博文写的比较复杂,还把交换机写进来了,其实完全没必要,死信队列根本上只是队列之间的绑定以及数据交换,具体代码就不说了,因为重点不在这里;
死信这种方式有个致命的缺点,导致我们这边无法使用:
- 1.时序问题:如果我们消息使用的是同一个队列,然后我们给消息本身设置过期时间,那么同一个队列中消息消费是按顺序来的,而不是过期时间,也就说说如果我们正常队列有两个数据A ttl15秒 B ttl 3秒,A在队列前面B在后面;那么我们消费的时候及时B过期时间更短,我们也不会先消费B而是会先消费A,因为同一队列有顺序问题。
- 2.过多队列问题: 前面说了如果单个队列那么消费就有顺序问题,那我们可以按过期时间分别绑定多个队列啊。 但是如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,我们业务方指定的时间是无法确定的,不可能去限制业务方让他们只能某个时间发,那么我们要创建无数个队列??显然这是不可能的;
因此这种方式直接pass了;
2.借助rabbitmq的延迟插件
2.1 延迟队列插件rabbitmq_delayed_message_exchange的安装
- 1.首先去RabbitMQ 插件下载网站下载
自己版本对应的ez文件类型插件
。 - 2.插件
rabbitmq_delayed_message_exchange
下载完放入rabbitmq 的plugins文件下 - 3.进入到rabbit文件的sbin目录,执行
#查看插件目录
rabbitmq-plugins list
#安装延迟队列插件 (rabbitmq-plugins enable 插件名)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.2 RabbitMq插件实现消息延迟的优点;
这个优点就是解决了上面方式的缺点。。。让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间;
2.3 实现方式;
- 1.创建一个delay类型的exchange,绑定对应的队列
- 2.这里delay类型和direct topic fanout等消息路由方式相对独立,也就是说我们可以和之前一样,指定exchange是direct还是topic还是fanout都可以。
根本上来说:我们指定其是delay类型不过是决定了其什么时候投递到指定路由队列
- 3.绑定路由队列进行消费
我们目前有两种不同类型的延迟,因此我这里用的topic模式结合延迟插件实现延迟队列;
topic结合delay插件实现延迟消息架构
如上所示,我们延迟的消息首先都放置到Mq里,然后延迟时间到了之后呢就会被路由到指定队列上去;
这么做有个小问题,如果我们延迟消息过多的话,那么必然存在着rabbitmq挤压消息,占用空间的问题,当然解决方案也比较简单
- 延迟不超过一天的我们直接进入rabbitmq
- 把延迟超过第一天的消息先进入mysql,每天定时扫第二天要发的数据,扫进mq里
- 这样的话RabbitMQ就做到了只存储当天消息的能力;
如果我们消息非常非常多,可以把消息分发区间划的更细点,比如只存储每12小时的消息,甚至只存储每个小时要发送的消息,这都是完全OK的;
代码也很简单,这里提供一个绑定了两种业务的延迟队列的小demo: 延迟队列配置:
代码语言:javascript复制@Configuration
public class DelayedRabbitMQConfig {
/**
* 声明延时队列hangfire
* 不设置TTL
*/
@Bean("hDelayQueue")
public Queue hangfireDelayQueue() {
return new Queue(BaseDict.DELAY_QUEUE_NAME_H);
}
/**
* 业务调用延迟队列
*
*/
@Bean("mDelayQueue")
public Queue moduleDelayQueue() {
return new Queue(BaseDict.DELAY_QUEUE_NAME_M);
}
/**
* 延迟交换机
*
*/
@Bean(name = "delayExchange")
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "topic");
return new CustomExchange(BaseDict.DELAY_EXCHANGE_DISPATCHER, "x-delayed-message", true, false, args);
}
@Bean
public Binding hangfireBinding(@Qualifier("hDelayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_H).noargs();
}
@Bean
public Binding moduleBinding(@Qualifier("mDelayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_M).noargs();
}
}
消息生产
代码语言:javascript复制 @PostMapping("addMsgToMDelayQueue")
public void addMsgToMQueue(@RequestBody XXX request) {
sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_M, JSONObject.toJSONString(request), 10*24*60*60*1000);
}
@PostMapping("addMsgToHDelay")
public void addMsgToHQueue(@RequestBody List<AAA> request) {
sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_H, JSONObject.toJSONString(request), 10*24*60*60*1000);
}
/**
* 消息发送到延迟交换机上
*/
public void sendDelayMsg(String exchange,String routingKey, String msg, Integer delayTime) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
消息消费
代码语言:javascript复制@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_H, concurrency = "10-30")
public void hangfireReceive(Message message, Channel channel) throws IOException {
final List<AAA> sendDetails = JSONObject.parseArray(JSONObject.toJSONString(message.getBody()), AAA.class);
Optional.ofNullable(sendDetails).ifPresent(item->item.forEach(System.out::println));
}
@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_M, concurrency = "10-20")
public void receiveQ(Message message, Channel channel) throws IOException {
final XXX request = JSONObject.parseObject(message.getBody(), XXX.class);
System.out.println("消费" request.toString());
}
3.基于RabbitMQ延迟插件实现延迟消息的局限性
我们在第一次使用这个延迟插件的时候做了一个压测,大约100W数据量的延迟会导致内存和Cpu使用量的急速上升,查了一些文档没搞明白后,去了官网看了下,发现其对此有以下解释,大致是讲目前这个延迟插件还不足以支持那么大的数据量,建议数据量不要太大 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72
因此我们如果想用好延迟插件,目前来说需要做一些额外的配合,尽量使其延时最近的数据,并且数据量维持到一个比较低的程度