RabbitMQ Topic(通配符)工作模式

2022-08-03 18:22:37 浏览数 (1)

RabbitMQ Topic(通配符)工作模式

上文我们介绍了它的路由工作模式,接下来介绍一个通配符的模式。

*(星号)可以正好代替一个词。 #(哈希)可以代替零个或多个单词

举例说明一下匹配案例

红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配,

我们还是把这个中间件的比较详细的原理图放在这里。

下面我们编写代码进行举例 就按照这个图的人原理编写

生产者

代码语言:javascript复制
package com.jgdabc.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//完成发送消息
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
//        5:创建交换机
//        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
//      1:exchange : 交换机名称
//       2: type:交换机的类型 BuiltinExchangeType代表枚举类型
//        Direct("direct"):定向
//        FANOUT("fanout") : 扇形广播,发送消息到每一个与之绑定的队列
//        TOPIC("topic") 通配符的方式
//        HEADERS("headers")
//        3:durable是否持久化
//        4:autoDelete:是否自动删除
//        5:internal 内部使用
//        6Larguments:参数列表
        String exchangeName = "test_topic";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//        6:创建队列
        String queueName = "test_topic_queue";
        String queueName01 = "test_topic_queue01";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);

//          queueBind(String queue, String exchange, String routingKey)
//        7 绑定队列和交换机
//        queue:绑定的队列名称
//        exchange:交换机名称
//        routingKey:路由键,绑定规则
//routingkey:系统的名称.日志的级别
//        需求,所有error级别的日志存入数据库,所有order级别的日志存入数据库
         channel.queueBind(queueName,exchangeName,"#.error");
         channel.queueBind(queueName,exchangeName,"order.*");
         channel.queueBind(queueName01,exchangeName,"*.*");

//        8:发送消息
        String body = "日志信息。张三调用了findAll方法,日志级别info....";
        channel.basicPublish(exchangeName,"hello.info",null,body.getBytes());

//        9:释放资源
        channel.close();
        connection.close();


    }
}

消费者一

代码语言:javascript复制
package com.jgdabc.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
        String queueName = "test_topic_queue";
        String queueName01 = "test_topic_queue01";

//        发送消息

//5创建队列
//        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
//        durable : 是否持久化 :当mq 重启数据还在
//        exclusive : 是否独占,只能有一个消费者监听这队列
//                      当connection关闭时候,是否删除队列
//        autoDelete:是否自动删除,当没有Consumer时候,是否自动删除

        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);
//        接收消息
//         basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
//        参数说明
//        queue: 队列名称
//        autoAck : 是否自动确认
//        callback: 回调函数
        DefaultConsumer consumer = new DefaultConsumer(channel) {
//            回调方法,当收到消息后会自动执行该方法
//            consumerTag:消息表示
//            ebvelop:获取一些信息,交换机的信息,路由等等
//            properties:配置信息
//            body:数据
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
//                System.out.println("consumerTag:" consumerTag);
//                System.out.println("Exchange:" envelope.getExchange());
//                System.out.println("RoutingKey:" envelope.getRoutingKey());
//                System.out.println("properties:" properties);
                  System.out.println("body:" new String(body));
                  System.out.println("将信息存储到数据库。。。");
            }
        };
        channel.basicConsume(queueName,true,consumer);
//
    }
}

消费者二

代码语言:javascript复制
package com.jgdabc.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic01 {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
        String queueName = "test_topic_queue";
        String queueName01 = "test_topic_queue01";

//        发送消息

//5创建队列
//        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
//        durable : 是否持久化 :当mq 重启数据还在
//        exclusive : 是否独占,只能有一个消费者监听这队列
//                      当connection关闭时候,是否删除队列
//        autoDelete:是否自动删除,当没有Consumer时候,是否自动删除

        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);
//        接收消息
//         basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
//        参数说明
//        queue: 队列名称
//        autoAck : 是否自动确认
//        callback: 回调函数
        DefaultConsumer consumer = new DefaultConsumer(channel) {
//            回调方法,当收到消息后会自动执行该方法
//            consumerTag:消息表示
//            ebvelop:获取一些信息,交换机的信息,路由等等
//            properties:配置信息
//            body:数据
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
//                System.out.println("consumerTag:" consumerTag);
//                System.out.println("Exchange:" envelope.getExchange());
//                System.out.println("RoutingKey:" envelope.getRoutingKey());
//                System.out.println("properties:" properties);
                  System.out.println("body:" new String(body));
                  System.out.println("将信息打印到控制台。。。");
            }
        };
        channel.basicConsume(queueName01,true,consumer);
//
    }
}

这样的话其实只有第二个消费者可以接收到消息。 验证。

0 人点赞