分布式专题|进入BAT必备之Springboot 集成 RabbitMQ的五种工作模式

2020-11-19 15:47:02 浏览数 (1)

点击上方蓝字关注我们 文末有惊喜

  • 介绍
  • 安装
  • 相关组件
  • 五种模式调用示例
    • 简单消息模式
    • 工作队列模式
    • Publish/Subscribe发布与订阅模式
    • Routing路由模式
    • Topics主题模式
  • 完整代码领取

介绍

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);

安装

这里为了方便,建议大家使用docker安装方式,docker安装相关命令如下:

  • 下载镜像
代码语言:javascript复制
docker pull rabbitmq:3.7.7-management
  • 创建实例并启动
代码语言:javascript复制
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ~/docker/rabbitmq/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.7-management

相关组件

在这里插入图片描述

  • 生产者: 是消息的产生者
  • 消费者:消息的处理者
  • 交换机(exchange):生产者和队列的桥梁,生产者发出的消息通过交换机传到队列
  • 队列:消息保存的地方,等待消费者来获取
  • 主机:所有的队列和交换机都是挂在主机上的,一个broker可以创建多个独立的主机
  • broker(server):交换机、队列和主组成了broker,broker是生产者和消费者的桥梁

五种模式调用示例

引入依赖

代码语言:javascript复制
   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

编写配置文件 application.yml

代码语言:javascript复制
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: my_vhost #这是我自己创建的主机,请填写你们自己的,注意不要带/
    username: lezai
    password: lezai
    publisher-returns: true
    publisher-confirm-type: simple

简单消息模式

简单消息使用默认交换机,routing_key和消费的队列名称保持一致

  • 生产者创建mq配置文件
代码语言:javascript复制
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_TEST_SIMPLE_MODE = "queue_test_simple_mode";
//    声明队列
    @Bean("queueTestSimpleMode")
    public Queue queueTestSimpleMode(){
        return QueueBuilder.durable(QUEUE_TEST_SIMPLE_MODE).build();
    }
}
  • 编写生产者端发送简单消息的代码
代码语言:javascript复制
@SpringBootTest
class RabbitmqProductApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleMode() {
        rabbitTemplate.convertAndSend("",RabbitMQConfig.QUEUE_TEST_SIMPLE_MODE,"测试简单模式");
    }
  • 查看管理台 发现已经看到我们创建的队列,并且队列里面已经有一条待消费的消息
  • 创建消费者
代码语言:javascript复制
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "queue_test_simple_mode")
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("收到消息:"   new String(message.getBody()));
    }
}

工作队列模式

这个同简单模式一样,只不过是创建多个消费者来监听队列,MQ会轮询往多个监听器里面推送消息

代码语言:javascript复制
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "queue_test_simple_mode")
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("收到消息:"   new String(message.getBody()));
    }
        @RabbitListener(queues = "queue_test_simple_mode")
    public void onMessage2(Message message, Channel channel) throws Exception {
        System.out.println("收到消息:"   new String(message.getBody()));
    }
}

Publish/Subscribe发布与订阅模式

发布订阅使用的是广播模式,所有与这个类型交换机绑定的队列都会接收到消息 交换机类型为「fanoutExchange」

  • 生产者的配置文件
代码语言:javascript复制
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_TEST_FANOUT_MODE_1 = "queue_test_fanout_mode_1";
    public static final String QUEUE_TEST_FANOUT_MODE_2 = "queue_test_fanout_mode_2";
    public static final String EXCHANGE_TEST_FANOUT_MODE = "exchange_test_fanout_mode";

    //    声明队列
    @Bean("queueTestFANOUTMode1")
    public Queue queueTestFANOUTMode1() {
        return QueueBuilder.durable(QUEUE_TEST_FANOUT_MODE_1).build();
    }
    //    声明队列
    @Bean("queueTestFANOUTMode2")
    public Queue queueTestFANOUTMode2() {
        return QueueBuilder.durable(QUEUE_TEST_FANOUT_MODE_2).build();
    }
    //  声明交换机
    @Bean("exchangeTestFANOUTMode")
    public Exchange exchangeTestFANOUTMode() {
        return ExchangeBuilder.fanoutExchange(EXCHANGE_TEST_FANOUT_MODE).durable(true).build();
    }

    // 绑定交换机与队列 订阅模式下 routingkey为空
    @Bean
    public Binding itemQueueExchange1(@Qualifier("queueTestFANOUTMode1") Queue queue,
                                     @Qualifier("exchangeTestFANOUTMode") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    @Bean
    public Binding itemQueueExchange2(@Qualifier("queueTestFANOUTMode2") Queue queue,
                                     @Qualifier("exchangeTestFANOUTMode") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
}
  • 编写生产者发送消息的代码
代码语言:javascript复制
    @Test
    public void testFanoutMode() {
        rabbitTemplate.convertAndSend("exchange_test_fanout_mode","",  "测试发布订阅模式");
    }
  • 查看管理台,多了一个交换机和一个队列,并且每个队列中都有一条我们刚才创建的消息
  • 消费端代码编写,消费端永远是监听队列的,所以和前面的方式是一样的
代码语言:javascript复制
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "queue_test_fanout_mode_1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        System.out.println("收到消息来自队列queue_test_fanout_mode_1:"   new String(message.getBody()));
    }
    @RabbitListener(queues = "queue_test_fanout_mode_2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        System.out.println("收到消息来自队列queue_test_fanout_mode_2:"   new String(message.getBody()));
    }
}

Routing路由模式

路由模式 是通过使用定向交换机进行转发消息,在队列与定向交换机绑定的时候,会提供一个routingkey,如果用户在发送消息的交换机的时候,带上了一个routingkey,则会根据这个routingkey进行匹配对应的队列。

  • 生产者配置文件编写
代码语言:javascript复制
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_TEST_ROUTING_MODE_1 = "queue_test_routing_mode_1";
    public static final String QUEUE_TEST_ROUTING_MODE_2 = "queue_test_routing_mode_2";
    public static final String EXCHANGE_TEST_ROUTING_MODE = "exchange_test_routing_mode";
    
    //    声明队列
    @Bean("queueTestROUTINGMode1")
    public Queue queueTestROUTINGMode1() {
        return QueueBuilder.durable(QUEUE_TEST_ROUTING_MODE_1).build();
    }
    //    声明队列
    @Bean("queueTestROUTINGMode2")
    public Queue queueTestROUTINGMode2() {
        return QueueBuilder.durable(QUEUE_TEST_ROUTING_MODE_2).build();
    }
    //  声明交换机
    @Bean("exchangeTestROUTINGMode")
    public Exchange exchangeTestROUTINGMode() {
        return ExchangeBuilder.directExchange(EXCHANGE_TEST_ROUTING_MODE).durable(true).build();
    }

    // 绑定交换机与队列
    // 这里指定了routingkey为order,也就是发消息的时候只能匹配到routingkey为order的消息
    @Bean
    public Binding itemQueueExchange3(@Qualifier("queueTestROUTINGMode1") Queue queue,
 // 这里指定了routingkey为order,也就是发消息的时候只能匹配到routingkey为member的消息                                     @Qualifier("exchangeTestROUTINGMode") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order").noargs();
    }
    
    @Bean
    public Binding itemQueueExchange4(@Qualifier("queueTestROUTINGMode2") Queue queue,
                                      @Qualifier("exchangeTestROUTINGMode") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("member").noargs();
    }
}
  • 编写生产者发送消息的代码

这里发送消息的时候指定了routingkey为order,那么只有当与交换机绑定的队列中设置了routingkey为order的才能监听到消息,这里消息会发送到queue_test_routing_mode_1队列中;

代码语言:javascript复制
   @Test
    public void testRoutingMode() {
        rabbitTemplate.convertAndSend("exchange_test_routing_mode","order",  "测试路由模式");
    }
  • 查看管理台,发现只有queue_test_routing_mode_1中有一条消息等待消费
  • 消费者的代码
代码语言:javascript复制
    @RabbitListener(queues = "queue_test_routing_mode_1")
    public void onMessage3(Message message, Channel channel) throws Exception {
        System.out.println("收到消息来自队列queue_test_routing_mode_1:"   new String(message.getBody()));
    }
    @RabbitListener(queues = "queue_test_routing_mode_2")
    public void onMessage4(Message message, Channel channel) throws Exception {
        System.out.println("收到消息来自队列queue_test_routing_mode_2:"   new String(message.getBody()));
    }

Topics主题模式

topics模式的交换机和定向交换机一样,唯一不同的是解析routingkey的时候,定向交换机会根据消息中的routingkey找到对应的队列,这个值是确定的;但是topic模式又称为通配符模式,这个在绑定的时候指定的routingkey是可以以通配符的形式存在的,这里只有两种通配符符号:

#代表1个或多个单词 例如 abc.#则会匹配到 abc.d 或abc.d.f

*代表一个单词 例如。abc.*则只能匹配到abc.a 或abc.f的消息,

代码逻辑和路由模式一样,就不演示了,已经将完整的实例代码传到git库中,欢迎索取:

完整代码领取

https://gitee.com/yangleliu/code_learning.git

0 人点赞