【Spring Boot实战与进阶】集成RabbitMQ的实例详解

2022-05-12 10:14:26 浏览数 (1)

Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用。这个专栏将对Spring Boot框架从浅入深,从实战到进阶,不但我们要懂得如何去使用,还要去剖析框架源码,学习其优秀的设计思想。

汇总目录链接:【Spring Boot实战与进阶】学习目录

文章目录
  • 一、简介
  • 二、集成RabbitMQ的简单例子
    • 1、引入依赖
    • 2、配置RabbitMQ连接信息
    • 3、创建RabbitMQ配置类和消息队列
    • 4、创建生产者
    • 5、创建消费者
    • 6、创建测试类
    • 7、运行项目
  • 三、项目实战
    • 1、引入依赖
    • 2、配置RabbitMQ连接信息
    • 3、创建RabbitMQ配置类和消息队列
    • 4、MsgConsumerListener(消费者)
    • 5、TestController(消息生产和推送的接口)
    • 6、调用接口推送消息

一、简介

   RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

常用的交换机有以下三种:

1、Direct Exchange(直连型交换机)

根据消息携带的路由键(routing key)将消息投递给对应队列,direct exchange 适用于消息的单播发送。 工作流程如下:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定 一个 route key。
  • 当一个携带 route key为R 的消息被发送到 direct exchange 时,exchange 会将消息路由到 绑定值同样为 R 的队列。注意Route Key和绑定值要完全匹配才行。 direct exchange 经常用于在 多个 worker 中分配任务,当这样做时,需注意,在AMQP 0-9-1中,消息的负载均衡发生在 consumer之间,而不是在 queue之间。

2、Fanout Exchange(扇型交换机)   一个 fanout exchange 会将消息分发给所有绑定到此 exchange 的queue中,不管 queue中的 route key。如果有 N 个 Queue 绑定到 一个 fanout exchange 时,那么此时 exchange 收到消息时,会将此消息分发到 这 N 个 queue中,由于此性质, fanout exchange 也常用消息的广播。

3、Topic Exchange(主题交换机)

  topic exchange 会根据 route key 将消息分发到与此消息的 route key 相匹配的并且绑定此exchange的一个或多个 queue。这里的相匹配与 direct exchange的完全匹配的路由规则不一样,topic exchange 在匹配规则上进行了扩展,规则如下:

  • RoutingKey(路由键)为一个点号 “.” 分隔的字符串,如 “com.rabbitmq.client”、“java.util.concurrent”、"com.hidden.client"等
  • BindingKey(绑定键) 和 RoutingKey一样也是点号 “.” 分隔的字符串
  • BindingKey (绑定键) 中可以存在两种 特殊字符串 “*” 和 “#” ,用于做模糊匹配,其中 " # " 用于匹配一个单词," * "用于匹配多个单词

  topic exchange 经常用于实现 publish/subscribe模型,即消息的多播模型。这里的Topic Exchange就适用于发布/订阅模型。RabbitMQ的一个原则就是,消息不能直接投递到 Queue中,必须先将消息投递到 Exchange中,然后由Exchange 按照路由规则将消息投递到对应的 Queue中。

二、集成RabbitMQ的简单例子

1、引入依赖

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

代码语言:javascript复制
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

3、创建RabbitMQ配置类和消息队列

代码语言:javascript复制
@Configuration
public class RabbitConfig {

    @Bean
    public Queue getQueue() {
        return new Queue("QA");
    }
}

4、创建生产者

代码语言:javascript复制
@Component
public class MsgProducer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("QA", "这是一条最新消息");
    }
}

5、创建消费者

代码语言:javascript复制
@Component
@RabbitListener(queues = "QA")
public class MsgConsumer1 {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: "   msg);
    }

}

6、创建测试类

代码语言:javascript复制
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Resource
    private MsgProducer msgProducer;

    // 发送单条消息
    @Test
    public void contextLoads() {
        msgProducer.send();
    }
}

7、运行项目

控制台输出:

代码语言:javascript复制
【消费者接收的消息】: 这是一条最新消息

RabbitMQ后台管理端:

三、项目实战

1、引入依赖

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

代码语言:javascript复制
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

mq:
  email:
    queue-name: email-queue
    exchange-name: email-exchange
    routing-key-name: email-routing-key

3、创建RabbitMQ配置类和消息队列

代码语言:javascript复制
@Configuration
@Slf4j
public class RabbitMQConfig {
    @Resource
    private CachingConnectionFactory connectionFactory;

    @Value("${mq.email.queue-name}")
    private String queueName;
    @Value("${mq.email.exchange-name}")
    private String exchangeName;
    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    /**
     * 单一消费者
     *
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功: correlationData({}), ack({}), cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失: exchange({}), route({}), replyCode({}), replyText({}), message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(queueName, true);
    }

    @Bean
    public DirectExchange emailExchange() {
        return new DirectExchange(exchangeName, true, false);
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(routingKeyName);
    }
}

4、MsgConsumerListener(消费者)

代码语言:javascript复制
@Component
public class MsgConsumerListener {
    @RabbitListener(queues = "${mq.email.queue-name}", containerFactory = "singleListenerContainer")
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: "   msg);
    }
}

5、TestController(消息生产和推送的接口)

代码语言:javascript复制
@RestController
public class TestController {

    @Value("${mq.email.exchange-name}")
    private String exchangeName;

    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/test")
    public void test() {
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setRoutingKey(routingKeyName);
        rabbitTemplate.convertAndSend("我是队列里的一条消息");
    }
}

6、调用接口推送消息

调用接口http://localhost:8080/test

控制台输出:

代码语言:javascript复制
消息发送成功: correlationData(null), ack(true), cause(null)
【消费者接收的消息】: 我是队列里的一条消息

0 人点赞