代码语言:javascript复制
定义监听器消费消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "ly.search.insert.queue", durable = "true"),
exchange = @Exchange(name = "ly.item.exchange",
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true"),
key = {"item.insert", "item.update"}
))
public void listenInsert(Long id) {
//监听新增或更新
if (id != null) {
searchService.insertOrUpdate(id);
}
}
自动创建,queue 和 exchange 绑定
@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。
代码语言:javascript复制@QueueBinding 将交换机和队列绑定
key = {"item.insert", "item.update"} 返回绑定的路由密钥或模式,多个元素将导致多个绑定
@Queue声明队列 (durable = "true" 表示持久化的)
@Exchange声明交换机(type = ExchangeTypes.TOPIC 表示交换机类型)
配置文件
代码语言:javascript复制spring:
rabbitmq:
host: 192.168.1.188
username: sunfeng
password: sunfeng
virtual-host: /sunfeng
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 30000ms
multiplier: 2
exchange: ly.item.exchange
publisher-confirms: true
pom文件
代码语言:javascript复制<!--amqp依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息发送
代码语言:javascript复制@Autowired
private AmqpTemplate amqpTemplate; 注入模板
/**
* 封装发送到消息队列的方法
*
* @param id
* @param type 发送消息的类型
*/
private void sendMessage(Long id, String type) {
log.info("发送消息到mq");
try {
amqpTemplate.convertAndSend("item." type, id);
} catch (Exception e) {
log.error("{}商品消息发送异常,商品ID:{}", type, id, e);
}
}