RabbitMQ Topic(通配符)工作模式
上文我们介绍了它的路由工作模式,接下来介绍一个通配符的模式。
*(星号)可以正好代替一个词。 #(哈希)可以代替零个或多个单词
举例说明一下匹配案例
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配,
我们还是把这个中间件的比较详细的原理图放在这里。
下面我们编写代码进行举例 就按照这个图的人原理编写
生产者
代码语言:javascript复制package com.jgdabc.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//完成发送消息
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 5:创建交换机
// exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// 1:exchange : 交换机名称
// 2: type:交换机的类型 BuiltinExchangeType代表枚举类型
// Direct("direct"):定向
// FANOUT("fanout") : 扇形广播,发送消息到每一个与之绑定的队列
// TOPIC("topic") 通配符的方式
// HEADERS("headers")
// 3:durable是否持久化
// 4:autoDelete:是否自动删除
// 5:internal 内部使用
// 6Larguments:参数列表
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
// 6:创建队列
String queueName = "test_topic_queue";
String queueName01 = "test_topic_queue01";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName01,true,false,false,null);
// queueBind(String queue, String exchange, String routingKey)
// 7 绑定队列和交换机
// queue:绑定的队列名称
// exchange:交换机名称
// routingKey:路由键,绑定规则
//routingkey:系统的名称.日志的级别
// 需求,所有error级别的日志存入数据库,所有order级别的日志存入数据库
channel.queueBind(queueName,exchangeName,"#.error");
channel.queueBind(queueName,exchangeName,"order.*");
channel.queueBind(queueName01,exchangeName,"*.*");
// 8:发送消息
String body = "日志信息。张三调用了findAll方法,日志级别info....";
channel.basicPublish(exchangeName,"hello.info",null,body.getBytes());
// 9:释放资源
channel.close();
connection.close();
}
}
消费者一
代码语言:javascript复制package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
String queueName = "test_topic_queue";
String queueName01 = "test_topic_queue01";
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName01,true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:" consumerTag);
// System.out.println("Exchange:" envelope.getExchange());
// System.out.println("RoutingKey:" envelope.getRoutingKey());
// System.out.println("properties:" properties);
System.out.println("body:" new String(body));
System.out.println("将信息存储到数据库。。。");
}
};
channel.basicConsume(queueName,true,consumer);
//
}
}
消费者二
代码语言:javascript复制package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic01 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
String queueName = "test_topic_queue";
String queueName01 = "test_topic_queue01";
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName01,true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:" consumerTag);
// System.out.println("Exchange:" envelope.getExchange());
// System.out.println("RoutingKey:" envelope.getRoutingKey());
// System.out.println("properties:" properties);
System.out.println("body:" new String(body));
System.out.println("将信息打印到控制台。。。");
}
};
channel.basicConsume(queueName01,true,consumer);
//
}
}
这样的话其实只有第二个消费者可以接收到消息。 验证。