application.properties:
代码语言:javascript复制spring.rabbitmq.addresses=192.
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
RabbitMQ与SpringBoot整合配置详解:
1. 生产端核心配置
publisher-confirms
,实现一个监听器用于监听Broker端为我们返回的确认请求:RabbitTemplate.ConfirmCallback
publisher-returns
,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback
注意一点,在发送消息时候对template进行设置mandatory=true保证监听有效
生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等。
RabbitSender:
代码语言:javascript复制package com.pyy.springboot.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData:" correlationData);
System.err.println("ack:" ack);
if(!ack) {
System.err.println("异常处理...");
}else {
// 更新数据库对应的消息状态:已发送
}
}
};
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("return exchange:" exchange " , routingKey:" routingKey ", replyCode:" replyCode ", replyText:" replyText);
}
};
public void send(Object message, Map<String, Object> headerProperties) throws Exception {
MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
Message msg = MessageBuilder.createMessage(message, messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData();
correlationData.setId("userid" System.currentTimeMillis());// id 时间戳 全局唯一 实际消息的id
//rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);
rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);
}
}
2. 消费端核心配置
代码语言:javascript复制spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
- 首先配置ACK
手工确认
模式,用于ACK的手工处理,这样我可以保证消息的可靠性送达
,或者在消费失败
时候可以做到重回队列
、根据业务记录日志等处理。 - 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
@RabbitListener注解使用
- 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用
@RabbitListener
只一个组合的注解,里面可以注解配置@QueueBinding
、@Queue
、@Exchange
直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接收者
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Info receiver:" msg);
}
}
@RabbitListener
注解如果没有存在exchange和queue会自动创建
案例详细代码:https://github.com/pyygithub/springboot-rabbitmq