RabbitMQ面试必备知识点及实战 - Exchange交换机类型详解

2024-09-10 15:37:05 浏览数 (1)

0 前言

Exchange:接收消息,并根据路由键转发消息所绑定的队列。交换机并非一个单独进程,而是一个有着“地址”的列表而已。

蓝区 - Send Message:把消息投递到交换机,由 RoutingKey 路由到指定队列。

交换机属性

声明交换机时可附带许多属性:

  • Name 交换机名称
  • Type 交换机类型,direct、topic、 fanout、 headers
  • Durability,是否需要持久化。 如果持久化,则RabbitMQ重启后,交换机还存在
  • Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
  • Internal 当前Exchange是否于RabbitMQ内部使用,默认为False

交换机类型

  1. Direct exchange(直连交换机)
  2. Fanout exchange(扇型交换机)
  3. Topic exchange(主题交换机)
  4. Headers exchange(头交换机)
  5. Dead Letter Exchange(死信交换机)

1 默认交换机

amq.* exchanges

1、一个队列对应了多个消费者,

2、默认,由队列对消息进行平均分配,消息会被分到不同的消费者手中。3、消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动

2 Direct Exchange

所有发送到DE的消息被转发到RouteKey中指定的Queue。

Direct模式可用RabbitMQ自带的Exchange:default Exchange,所以无需将Exchange进行任何绑定(binding),消息传递时,RouteKey须完全匹配才会被队列接收,否则该消息被丢弃。

Direct Exchange原理示意图
实战
代码语言:java复制
/**
 * 直连模式-生产者
 *
 * @author JavaEdge
 */
public class ProducerDirectExchange {
    public static void main(String[] args) throws Exception {
       //1 创建ConnectionFactory
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       
       //2 创建Connection
       Connection connection = connectionFactory.newConnection();

       //3 创建Channel
       Channel channel = connection.createChannel();

       //4 声明
       String exchangeName = "test_direct_exchange";
       // !!!!!!!!!!!!!!!!!!!!!!!!
       String routingKey = "test.direct";

       //5 发送
       String msg = "Hello JavaEdge RabbitMQ Direct Exchange Message ... ";
       channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}
代码语言:java复制
/**
 * 直连模式-消费者
 *
 * @author JavaEdge
 */
public class ConsumerDirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       // 自动重连(3s)
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // name和Pro中的一致
       String exchangeName = "test_direct_exchange";
       String exchangeType = "direct";
       String queueName = "test_direct_queue";
       String routingKey = "test.direct";

       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       //建立绑定关系
       channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        while(true){  
            //获取消息,如果没有消息,该步将会阻塞
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:"   msg);
        } 
    }
}

路由key保持一致,分别启动:

看交换机:

看绑定关系:

看队列名:

看队列数据源的交换机:

3 Topic exchange

直接交换的局限性:不能做基于多个标准的路由。

如日志系统,可能不仅要根据严重性订阅日志,还要根据日志源订阅日志。

syslog unix工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。

这更具灵活性 - 可能想监听来自 cron 的关键错误及来自 kern 的所有日志。为了在日志记录系统实现这点,还需了解主题交换机。

  • *可匹配一个单词
  • #可匹配零或多个单词
  • 所有发送到Topic Exchange的消息会被转发到所有关心RouteKey中指定Topic的Queue
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需绑定一个Topic

案例

将发送所有描述动物的消息。消息将与包含三个单词(两个点)的routing key一起发送。

代码语言:bash复制
routing key中的第一个单词描述速度,第二颜色,第三是物种:“<speed>。<color>。<species>”。

创建三个绑定:

代码语言:bash复制
Q1绑定了绑定键“* .orange.*”,Q2绑定了“*.*.rabbit”和“lazy.#”

这些绑定可总结为:

  • Q1对所有橙色动物感兴趣
  • Q2希望听到关于兔子的一切,以及关于懒惰动物的一切

routing key设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面

  • “quick.orange.fox”只会转到第一个队列
  • 而“lazy.brown.fox”只会转到第二个队列
  • “lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定
  • “quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。

如果我们违背我们的约定并发送带有一个或四个单词的消息,例如“orange” or “quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不会匹配任何绑定,因此将丢失.

另一方面,“lazy.orange.male.rabbit”,虽然它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

实例图

实战

代码语言:java复制
/**
 * @author JavaEdge
 */
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.javaedge";

        String msg = "Hello JavaEdge RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        channel.close();
        connection.close();
    }
}
代码语言:java复制
/**
 * 主题交换机-消费端
 *
 * @author JavaEdge
 */
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
//     String routingKey = "user.#";
        String routingKey = "user.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:"   msg);
        }
    }
}

启动消费者:

启动生产者:

消费端收到了消息。

修改匹配格式,理论上只能接受前两个消息:

管控台,先将之前的匹配绑定取消:

显然仅能接受前两个消息:

小结

当队列绑定“#”(哈希)绑定key时,它将接收所有消息,而不管routing key,就像fanout交换机。

当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换机的行为就像直接交换机。

4 Fanout Exchange

不处理路由键,只需简单的将队列绑定到交换机。发送到交换机的消息都会被转发到与该交换机绑定的所有队列。Fanout交换机转发消息是最快的:

实战

代码语言:java复制
/**
 * @author JavaEdge
 */
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       String exchangeType = "fanout";
       String queueName = "test_fanout_queue";
        // 不设置路由键
       String routingKey = "";
       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while(true){
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:"   msg);
        } 
    }
}
代码语言:java复制
/**
 * @author JavaEdge
 */
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       for(int i = 0; i < 4; i   ) {
          String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
          channel.basicPublish(exchangeName, "", null , msg.getBytes());           
       }
       channel.close();  
        connection.close();  
    }
}

启动消费端

无需routing key

启动生产者后接收到的消息:

5 Header Exchange

根据消息头信息(headers)来路由消息,而非路由键(routing key)。要在消息头设置一些KV对,交换机会根据这些键值对来决定将消息路由到哪个队列。

代码语言:java复制
/**
 * @author JavaEdge
 */
public class Producer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";

        for (int i = 0; i < 4; i  ) {
            String msg = "Hello World RabbitMQ 4 HEADERS Exchange Message ...";

            // 设置消息的头部信息
            Map<String, Object> headers = new HashMap<>();
          	// 指定匹配规则
            headers.put("x-match", "any"); // any 表示只要有一个头部信息匹配即可,all 表示所有头部信息都要匹配。
            headers.put("name", "JavaEdge");
            headers.put("age", "30");
						 // 将头部信息作为 BasicProperties 的一部分传递
            channel.basicPublish(exchangeName, "", new com.rabbitmq.client.AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build(), msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}
代码语言:java复制
/**
 * @author JavaEdge
 */
public class Consumer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";
        String exchangeType = "headers";
        String queueName = "test_headers_queue";

        // 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 绑定队列到交换机,并设置头部匹配规则
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-match", "any");
        headers.put("name", "JavaEdge");
        headers.put("age", "30");

        channel.queueBind(queueName, exchangeName, "", headers);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:"   msg);
        }
    }
}

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。 各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。 负责: 中央/分销预订系统性能优化 活动&券等营销中台建设 交易平台及数据中台等架构和开发设计 车联网核心平台-物联网连接平台、大数据平台架构设计及优化 LLM Agent应用开发 区块链应用开发 大数据开发挖掘经验 推荐系统项目 目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

  • 编程严选网

0 人点赞