0 前言
Exchange:接收消息,并根据路由键转发消息所绑定的队列。交换机并非一个单独进程,而是一个有着“地址”的列表而已。
蓝区 - Send Message:把消息投递到交换机,由 RoutingKey
路由到指定队列。
交换机属性
声明交换机时可附带许多属性:
- Name 交换机名称
- Type 交换机类型,direct、topic、 fanout、 headers
- Durability,是否需要持久化。 如果持久化,则RabbitMQ重启后,交换机还存在
- Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
- Internal 当前Exchange是否于RabbitMQ内部使用,默认为False
交换机类型
- Direct exchange(直连交换机)
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
- Dead Letter Exchange(死信交换机)
1 默认交换机
amq.* exchanges
1、一个队列对应了多个消费者,
2、默认,由队列对消息进行平均分配,消息会被分到不同的消费者手中。3、消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动
2 Direct Exchange
所有发送到DE的消息被转发到RouteKey中指定的Queue。
Direct模式可用RabbitMQ自带的Exchange:default Exchange,所以无需将Exchange进行任何绑定(binding),消息传递时,RouteKey须完全匹配才会被队列接收,否则该消息被丢弃。
Direct Exchange原理示意图
实战
代码语言:java复制/**
* 直连模式-生产者
*
* @author JavaEdge
*/
public class ProducerDirectExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
// !!!!!!!!!!!!!!!!!!!!!!!!
String routingKey = "test.direct";
//5 发送
String msg = "Hello JavaEdge RabbitMQ Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
代码语言:java复制/**
* 直连模式-消费者
*
* @author JavaEdge
*/
public class ConsumerDirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 自动重连(3s)
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// name和Pro中的一致
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//建立绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
while(true){
//获取消息,如果没有消息,该步将会阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Get Message:" msg);
}
}
}
路由key保持一致,分别启动:
看交换机:
看绑定关系:
看队列名:
看队列数据源的交换机:
3 Topic exchange
直接交换的局限性:不能做基于多个标准的路由。
如日志系统,可能不仅要根据严重性订阅日志,还要根据日志源订阅日志。
syslog unix工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。
这更具灵活性 - 可能想监听来自 cron
的关键错误及来自 kern
的所有日志。为了在日志记录系统实现这点,还需了解主题交换机。
*
可匹配一个单词#
可匹配零或多个单词- 所有发送到Topic Exchange的消息会被转发到所有关心RouteKey中指定Topic的Queue
- Exchange将RouteKey和某Topic进行模糊匹配,此时队列需绑定一个Topic
案例
将发送所有描述动物的消息。消息将与包含三个单词(两个点)的routing key一起发送。
代码语言:bash复制routing key中的第一个单词描述速度,第二颜色,第三是物种:“<speed>。<color>。<species>”。
创建三个绑定:
代码语言:bash复制Q1绑定了绑定键“* .orange.*”,Q2绑定了“*.*.rabbit”和“lazy.#”
这些绑定可总结为:
- Q1对所有橙色动物感兴趣
- Q2希望听到关于兔子的一切,以及关于懒惰动物的一切
routing key设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面
- “quick.orange.fox”只会转到第一个队列
- 而“lazy.brown.fox”只会转到第二个队列
- “lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定
- “quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。
如果我们违背我们的约定并发送带有一个或四个单词的消息,例如“orange” or “quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不会匹配任何绑定,因此将丢失.
另一方面,“lazy.orange.male.rabbit”,虽然它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。
实例图
实战
代码语言:java复制/**
* @author JavaEdge
*/
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.javaedge";
String msg = "Hello JavaEdge RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
channel.close();
connection.close();
}
}
代码语言:java复制/**
* 主题交换机-消费端
*
* @author JavaEdge
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// String routingKey = "user.#";
String routingKey = "user.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Get Message:" msg);
}
}
}
启动消费者:
启动生产者:
消费端收到了消息。
修改匹配格式,理论上只能接受前两个消息:
管控台,先将之前的匹配绑定取消:
显然仅能接受前两个消息:
小结
当队列绑定“#”(哈希)绑定key时,它将接收所有消息,而不管routing key,就像fanout交换机。
当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换机的行为就像直接交换机。
4 Fanout Exchange
不处理路由键,只需简单的将队列绑定到交换机。发送到交换机的消息都会被转发到与该交换机绑定的所有队列。Fanout交换机转发消息是最快的:
实战
代码语言:java复制/**
* @author JavaEdge
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
// 不设置路由键
String routingKey = "";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true){
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Get Message:" msg);
}
}
}
代码语言:java复制/**
* @author JavaEdge
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout_exchange";
for(int i = 0; i < 4; i ) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
启动消费端
无需routing key
启动生产者后接收到的消息:
5 Header Exchange
根据消息头信息(headers)来路由消息,而非路由键(routing key)。要在消息头设置一些KV对,交换机会根据这些键值对来决定将消息路由到哪个队列。
代码语言:java复制/**
* @author JavaEdge
*/
public class Producer4HeadersExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_headers_exchange";
for (int i = 0; i < 4; i ) {
String msg = "Hello World RabbitMQ 4 HEADERS Exchange Message ...";
// 设置消息的头部信息
Map<String, Object> headers = new HashMap<>();
// 指定匹配规则
headers.put("x-match", "any"); // any 表示只要有一个头部信息匹配即可,all 表示所有头部信息都要匹配。
headers.put("name", "JavaEdge");
headers.put("age", "30");
// 将头部信息作为 BasicProperties 的一部分传递
channel.basicPublish(exchangeName, "", new com.rabbitmq.client.AMQP.BasicProperties.Builder()
.headers(headers)
.build(), msg.getBytes());
}
channel.close();
connection.close();
}
}
代码语言:java复制/**
* @author JavaEdge
*/
public class Consumer4HeadersExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_headers_exchange";
String exchangeType = "headers";
String queueName = "test_headers_queue";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 绑定队列到交换机,并设置头部匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "any");
headers.put("name", "JavaEdge");
headers.put("age", "30");
channel.queueBind(queueName, exchangeName, "", headers);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Get Message:" msg);
}
}
}
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。 各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。 负责: 中央/分销预订系统性能优化 活动&券等营销中台建设 交易平台及数据中台等架构和开发设计 车联网核心平台-物联网连接平台、大数据平台架构设计及优化 LLM Agent应用开发 区块链应用开发 大数据开发挖掘经验 推荐系统项目 目前主攻市级软件项目设计、构建服务全社会的应用系统。
参考:
- 编程严选网