RabbitMQ延迟消费和重复消费

2021-02-25 14:51:37 浏览数 (1)

转载自 https://blog.csdn.net/quliuwuyiz/article/details/79301054

使用RabbitMQ实现延迟任务 场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

延迟任务的模型如下图:

基于 RabbitMQ 实现的分布式延迟重试队列 场景一:在消费该消息的时候,发现条件不满足,需要等待30分钟,重新消费该消息,再次判断是否满足条件,如果满足则消费该消息,如果不满足,则再等待30分钟。这样的场景通过mq队列来实现。

在消息队列的监听过程中,先判断条件是否满足,满足,则直接消费。不满足,则将该消息发送到上图的死信队列,但是在死信队列失效之后,需要重新转发到当前队列进行消费就可以实现该功能。

基本概念如下: 消息的TTL ( Time to Live ) 和 DLX (Dead Letter Exchange)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes); 当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges Exchage的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

package com.test.sender.delay;

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map;

import javax.annotation.Resource;

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component;

import com.drools.model.MQPushErrorFlow;

@Component @PropertySource(value = "classpath:riskConfigMq.properties") public class LifsInCompleteDataOneSend { private static final Log log = LogFactory.getLog(LifsInCompleteDataOneConfig.class);

private static final String DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST = "delay_queue_per_queue_lifs_ttl"; // TTL配置在队列上的缓冲队列。 private static final String DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST = "delay_queue_per_queue_lifs_routing_key"; // TTL配置在队列上的缓冲队列。

private static final Integer QUEUE_EXPIRATION_FIRST = 30000;

/** * 消息队列业务名称 */ @Value("${lifs.consumer.pushServiceName}") private String pushServiceName;

/** * 订阅平台名称 */ @Value("${lifs.consumer.platformName}") private String platformName;

/** * 消息队列一个业务使用的队列的数量 */ @Value("${lifs.consumer.queueShardingCount}") private Integer queueShardingCount;

/** * 交换机的名称,共用lifs监听的交换机 */ @Value("

/** * 底层需要使用的真实发送对象,每个发送对象都需要对应一个 */ @Resource(name = "lnCompleteDataOneRabbitTemplate") private RabbitTemplate rabbitTemplate;

@Bean public Queue delayQueueFirstTTL() { Map arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", exchangeName); arguments.put("x-dead-letter-routing-key", getDirectRoutingKey(pushServiceName, 0, platformName)); arguments.put("x-message-ttl", QUEUE_EXPIRATION_FIRST); Queue queue = new Queue(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, true, false, false, arguments); log.info("第一次延迟队列名称: " DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST "  延期之后的转发的routingKey: " getDirectRoutingKey(pushServiceName, 0, platformName) "  exchange: " exchangeName); /* * Queue queue = QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL) // * delay_queue_per_queue_ttl .withArgument("x-dead-letter-exchange",DELAY_EXCHANGE_NAME) * .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) * .withArgument("x-message-ttl",QUEUE_EXPIRATION).build();  * Queue queue =new Queue(DELAY_QUEUE_PER_QUEUE_TTL,true); */ return queue; }

@Bean public Binding lnCompleteDataOneBinding() { return BindingBuilder.bind(delayQueueFirstTTL()).to(lnCompleteDataOneExchange()).with(DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST); }

@Bean(name = "lnCompleteDataOneExchange") public DirectExchange lnCompleteDataOneExchange() { return new DirectExchange(exchangeName); }

    private String getDirectRoutingKey(String pushServiceName, int shardingIndex, String platformName) {         return String.format("%s.%d.%s", pushServiceName, shardingIndex, platformName);     }     @Bean(name = "delayQueueFirstListenerContainer")     public String delayQueueFirstListenerContainer(@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {     Queue queue = delayQueueFirstTTL();     RabbitAdmin ra = new RabbitAdmin(connectionFactory);         ra.declareExchange(lnCompleteDataOneExchange());         ra.declareQueue(queue);         ra.declareBinding(lnCompleteDataOneBinding());         log.info("delayQueueFirstListenerContainer: queueName" queue.getName() "  exchangeName: " lnCompleteDataOneExchange().getName() " routingKey: " DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);         return "";     } /** * 自动生成uuid调用发送方法 *  * @param dto * @param routingId */ public String send(String message) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("延迟半分钟的队列中接受消息的时间: " df.format(new Date()) "n消息的內容:" message);

rabbitTemplate.convertAndSend(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, message); // 向队列里面发送消息,第一个参数是队列名称,第二个参数是内容

return "sender delay"; }

}

package com.test.sender.delay;

import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; import com.framework.mq.common.RabbitConfig;

@Component @PropertySource(value = "classpath:riskConfigMq.properties") public class LifsInCompleteDataOneConfig {

/** * MQ服务地址和端口号 */ @Value("{rabbitmq.password}") private String password; /** * MQ的虚拟主机 */ @Value("{rabbitmq.publisherConfirms}") private boolean publisherConfirms; /** * 缓存的channel的数量 */ @Value("

/** * 交换机的名称,共用lifs监听的交换机 */ @Value("${lifs.consumer.exchangeName}") private String exchangeName;

/** * 注入RabbitConfig对象 * @return */ @Bean(name = "lnCompleteDataOneRabbitConfig") public RabbitConfig rabbitConfig() { return new RabbitConfig(addresses, username, password, virtualHost, publisherConfirms, channelCacheSize, connectionCacheSize, exchangeName); }

/** * 注入连接工厂对象 *  * @param rabbitConfig 之前注入的 @RabbitConfig 对象 * @return */ @Bean(name = "lnCompleteDataOneConnectionFactory") public ConnectionFactory connectionFactory( @Qualifier(value = "lnCompleteDataOneRabbitConfig") RabbitConfig rabbitConfig) { return rabbitConfig.getConnectionFactory(); }

/** * 注入的 @RabbitTemplate 对象 *  * @param connectionFactory * @return */ @Bean(name = "lnCompleteDataOneRabbitTemplate") RabbitTemplate rabbitTemplate( @Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {

return new RabbitTemplate(connectionFactory); } }

在初次监听消息队列的地方

在业务代码中,判断条件是否满足,如果不满足,赋值incompleteDataFlagResult=1,在第二次重试的时候,如果还不满足,则赋值incompleteDataFlagResult=2,如果满足,则赋值incompleteDataFlagResult=200,直接消费,并发送回调的mq。

if(incompleteDataFlagResult==1){ //推进到等待30秒过期的队列 lifsInCompleteDataOneSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==2){  //推进到等待60秒过期的队列

lifsInCompleteDataTwoSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==3){ // 进行保存,需要手工处理 InstallmentRequestFailure installmentRequestFailure = new InstallmentRequestFailure(); installmentRequestFailureService.save(installmentRequestFailure); }else if(incompleteDataFlagResult==200){ lifsPushSender.send(request, customerId); }

---------------------  作者:quliuwuyiz  来源:CSDN  原文:https://blog.csdn.net/quliuwuyiz/article/details/79301054  版权声明:本文为博主原创文章,转载请附上博文链接!

0 人点赞