springboot使用@RabbitListener注解消费消息

2019-07-02 18:04:24 浏览数 (2)

代码语言:javascript复制
定义监听器消费消息
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "ly.search.insert.queue", durable = "true"),
        exchange = @Exchange(name = "ly.item.exchange",
                type = ExchangeTypes.TOPIC,
                ignoreDeclarationExceptions = "true"),
        key = {"item.insert", "item.update"}
))
public void listenInsert(Long id) {
    //监听新增或更新
    if (id != null) {
        searchService.insertOrUpdate(id);
    }
}

自动创建,queue 和 exchange 绑定

@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。

代码语言:javascript复制
@QueueBinding 将交换机和队列绑定    
 key = {"item.insert", "item.update"}  返回绑定的路由密钥或模式,多个元素将导致多个绑定
@Queue声明队列 (durable = "true" 表示持久化的)   
@Exchange声明交换机(type = ExchangeTypes.TOPIC 表示交换机类型) 

配置文件

代码语言:javascript复制
spring:
  rabbitmq:
   host: 192.168.1.188
   username: sunfeng
   password: sunfeng
   virtual-host: /sunfeng
   template:
     retry:
       enabled: true
       initial-interval: 10000ms
       max-interval: 30000ms
       multiplier: 2
     exchange: ly.item.exchange
   publisher-confirms: true

pom文件

代码语言:javascript复制
<!--amqp依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

消息发送

代码语言:javascript复制
@Autowired
private AmqpTemplate amqpTemplate;  注入模板

    /**
     * 封装发送到消息队列的方法
     *
     * @param id
     * @param type 发送消息的类型
     */
    private void sendMessage(Long id, String type) {
        log.info("发送消息到mq");
        try {
            amqpTemplate.convertAndSend("item."   type, id);
        } catch (Exception e) {
            log.error("{}商品消息发送异常,商品ID:{}", type, id, e);
        }
    }

0 人点赞