前面介绍了RabbitMq的几种模式,这篇文章主要介绍死信队列的使用和实际应用场景订单超时怎么和死信队列结合。
一、业务场景
用户在淘宝或者京东下单的时候,一般会预购,30分钟之后如果没有付款则订单超时,这个功能怎么实现呢?
1、可以存入mysql数据库,然后每隔一段时间就定时器查询一次数据库,这样对数据库的io负载很大,而且百分之90都是没必要的开销。
2、可以和rabbitMq死信队列TTL来实现。
二、代码实例
死信队列满足的条件是什么呢,当队列订单超时,当队列超过最大值,当消费者消费失败主动调用basicNack方法进入死信队列。在配置文件需要加入几行参数:
default-requeue-rejected=false 这个一定要设置成为false。# 默认是auto 自动确定是否收到消息,如果消费失败则会一直进入队列消费 # 改为manual手动调用change.basicAck确认 # 改为none 若没收到或者消费成功都不会回到队列 spring.rabbitmq.listener.simple.acknowledge-mode=manual
代码语言:javascript复制# ----- RabbitMq -------- #
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
# 默认true,代表消费报错会重新回到队列。false则不会回到队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
# 默认是auto 自动确定是否收到消息,如果消费失败则会一直进入队列消费
# 改为manual手动调用change.basicAck确认
# 改为none 若没收到或者消费成功都不会回到队列
spring.rabbitmq.listener.simple.acknowledge-mode=manual
接下来创建死信队列和business队列:
代码语言:javascript复制 public static final String business_fanout_exchange = "business-fanout-exchange";
public static final String dead_letter_exchange = "dead-letter-exchange";
public static final String dead_letter_routing_keyA = "dead-letter-routing-keyA";
public static final String dead_letter_routing_keyB = "dead-letter-routing-keyB";
public static final String business_queueA = "business-queueA";
public static final String dead_letter_queueA = "dead-letter-queueA";
public static final String dead_letter_queueB = "dead-letter-queueB";
/**
* business交换机
*
* @return
*/
@Bean
public FanoutExchange businessFanoutExchange() {
return new FanoutExchange(business_fanout_exchange);
}
/**
* 死信交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(dead_letter_exchange);
}
/**
* business队列
*/
@Bean
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", dead_letter_exchange);
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", dead_letter_routing_keyA);
return QueueBuilder.durable(business_queueA).withArguments(args).build();
}
/**
* 声明死信队列
*/
@Bean
public Queue deadLetterQueueA() {
return new Queue(dead_letter_queueA);
}
@Bean
public Queue deadLetterQueueB() {
return new Queue(dead_letter_queueB);
}
/**
* 业务A绑定交换机
*/
@Bean
public Binding businessQueueBindingA(@Qualifier("businessQueueA") Queue businessQueueA,
@Qualifier("businessFanoutExchange") FanoutExchange businessFanoutExchange) {
return BindingBuilder.bind(businessQueueA).to(businessFanoutExchange);
}
/**
* 死信绑定交换机
*/
@Bean
public Binding deadQuereBindingA(@Qualifier("deadLetterQueueA") Queue deadLetterQueueA,
@Qualifier("deadLetterExchange") DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueueA).to(deadLetterExchange).with(dead_letter_routing_keyA);
}
@Bean
public Binding deadQuereBindingB(@Qualifier("deadLetterQueueB") Queue deadLetterQueueB,
@Qualifier("deadLetterExchange") DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueueB).to(deadLetterExchange).with(dead_letter_routing_keyB);
}
死信队列访问接口:
代码语言:javascript复制 /**
* 死信
*/
@RequestMapping("/deadTo")
public void deadTo() {
String dead_letter = "dead-letter";
rabbitTemplate.convertAndSend(DeadConfig.business_fanout_exchange, "", dead_letter);
}
消费监听:
代码语言:javascript复制 /**
* 死信 【业务端】
*/
@RabbitListener(queues = DeadConfig.business_queueA)
public void dead(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("业务接收到消息AAAA");
boolean ack = true;
try {
if (msg.contains("dead-letter")) {
throw new RuntimeException("消费异常");
}
} catch (Exception e) {
ack = false;
}
System.out.println(
"message.getMessageProperties().getDeliveryTag():" message.getMessageProperties().getDeliveryTag());
if (!ack) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
/**
* 死信 【死信队列监听】
*
* @param message
*/
@RabbitListener(queues = DeadConfig.dead_letter_queueA)
public void deadQueueA(Message message, Channel channel) throws IOException {
System.out.println("接收到死信队列消息AAA:" new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
这时候死信就可以接受到数据了,如果业务发生异常,这时候进入死信队列消息,可以进行我们的业务。
如果这时候要实现订单超时功能可以改成下面的代码
代码语言:javascript复制 /**
* business队列
*/
@Bean
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(3);
//订单最多存在10s
args.put("x-message-ttl",10000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", dead_letter_exchange);
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", dead_letter_routing_keyA);
return QueueBuilder.durable(business_queueA).withArguments(args).build();
}
/**
* 死信
*/
@RequestMapping("/deadTo")
public void deadTo() {
//添加未支付 订单到mysql , 0代表未支付,1代表已支付
int order = 0;
rabbitMqService.addUserOrder(order);
String dead_letter = "dead-letter";
rabbitTemplate.convertAndSend(DeadConfig.business_fanout_exchange, "", dead_letter);
}
/**
* 死信 【业务端】
*/
@RabbitListener(queues = DeadConfig.business_queueA)
public void dead(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("业务接收到消息AAAA");
boolean ack = true;
try {
if (msg.contains("dead-letter")) {
throw new RuntimeException("消费异常");
}
} catch (Exception e) {
ack = false;
}
System.out.println(
"message.getMessageProperties().getDeliveryTag():" message.getMessageProperties().getDeliveryTag());
if (!ack) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
/**
* 死信 【死信队列监听】
*
* @param message
*/
@RabbitListener(queues = DeadConfig.dead_letter_queueA)
public void deadQueueA(Message message, Channel channel) throws IOException {
// 死信队列监听业务过期消息,查看数据库是否已经修改数据,0未支付,1代表已支付
int order = getMysqlUserOrder();
if(order == 0){
//则吧数据库改为订单超时
System.out.println("订单超时");
}else{
// 则吧数据库订单改为发货
System.out.println("订单发货");
}
System.out.println("接收到死信队列消息AAA:" new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}