RabbitMQ入门:主题路由器(Topic Exchange)[通俗易懂]

2022-07-18 16:34:43 浏览数 (1)

大家好,又见面了,我是全栈君。

上一篇博文中,我们使用direct exchange 代替了fanout exchange,这次我们来看下topic exchange。

一、Topic Exchange介绍

topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准。

direct路由器类似于sql语句中的精确查询;topic 路由器有点类似于sql语句中的模糊查询。

还记得吗?我们在《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中对exchange的分类进行过介绍:

代码语言:javascript复制
Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
Headers:我们可以不考虑它。

下面是官网给出的工作模型(P代表生产者,X代表exhange,红色的Q代表队列,C代表消费者):

我们来分析下这个模型。

它发送的消息是用来描述动物的。路由键有三个单词:<speed>.<color>.<species>,第一个单词描述了速度,第二个描述了颜色,第三个描述了物种。 有三个绑定键,Q1绑定键为*.orange.*(关注所有颜色为orange的动物); Q2的绑定键有两个,分别是*.*.rabbit(关注所有的兔子)和lazy.#(关注所有速度为lazy的动物)。

因此,路由键为quick.orange.rabbit的消息将发送到Q1和Q2,路由键为quick.orange.fox的消息将发送到Q1,路由键为lazy.brown.fox的消息将发送到Q2。路由键为lazy.pink.rabbit的消息将发送到Q2,但是注意,它只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。

如果有人手一抖发了个lazy.orange.male.rabbit这种四个单词的,这个怎么办呢? 由于它和lazy.#匹配,因此将发送到Q2。

二、代码示例

接下来我们看下代码

  1. 生产者
代码语言:javascript复制
public class LogTopicSender {
    // exchange名字
    public static String EXCHANGE_NAME = "topicExchange";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        Channel channel = null;
        try {
            // 1.创建连接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 2.为通道声明topic类型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            
            // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
            String routingKey = "info";
//            String routingKey = "log4j.error";
//            String routingKey = "logback.error";
//            String routingKey = "log4j.warn";
            String msg = " hello rabbitmq, I am "   routingKey;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
            System.out.println("product send a msg: "   msg);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 4.关闭连接

            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}
  1. 消费者
代码语言:javascript复制
public class LogTopicReciver {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        Channel channel = null;
        try {
            // 1.创建连接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 2.为通道声明topic类型的exchange
            channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 3.创建随机名字的队列
            String queueName = channel.queueDeclare().getQueue();

            // 4.建立exchange和队列的绑定关系
            String[] bindingKeys = { "#" };
//            String[] bindingKeys = { "log4j.*", "#.error" };
//            String[] bindingKeys = { "*.error" };
//            String[] bindingKeys = { "log4j.warn" };
            for (int i = 0; i < bindingKeys.length; i  ) {
                channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]);
                System.out.println(" **** LogTopicReciver keep alive ,waiting for "   bindingKeys[i]);
            }

            // 5.通过回调生成消费者并进行监听
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {

                    // 获取消息内容然后处理
                    String msg = new String(body, "UTF-8");
                    System.out.println("*********** LogTopicReciver"   " get message :["   msg   "]");
                }
            };
            // 6.消费消息
            channel.basicConsume(queueName, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
  1. 启动消费者,作为消费者1
  2. 分别将String[] bindingKeys = { “#” };改为String[] bindingKeys = { “log4j.*”, “#.error” };/String[] bindingKeys = { “*.error” };/String[] bindingKeys = { “log4j.warn” };,然后启动作为消费者2、消费者3、消费者4
  3. 启动4次生产者,routing key分别为String routingKey = “info”;、String routingKey = “log4j.error”;、String routingKey = “logback.error”;、String routingKey = “log4j.warn”;
  4. 观察控制台log
代码语言:javascript复制
生产者:
product send a msg:  hello rabbitmq, I am info
product send a msg:  hello rabbitmq, I am log4j.error
product send a msg:  hello rabbitmq, I am logback.error
product send a msg:  hello rabbitmq, I am log4j.warn

消费者1:
 **** LogTopicReciver keep alive ,waiting for #
*********** LogTopicReciver get message :[ hello rabbitmq, I am info]
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]

消费者2:
 **** LogTopicReciver keep alive ,waiting for log4j.*
 **** LogTopicReciver keep alive ,waiting for #.error
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]

消费者3:
 **** LogTopicReciver keep alive ,waiting for *.error
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]

消费者4:
 **** LogTopicReciver keep alive ,waiting for log4j.warn
*********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]
  1. 观察RabbitMQ管理页面

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/120895.html原文链接:https://javaforall.cn

0 人点赞