前提
如果能提前先阅读一下之前写过的一篇文章理解RabbitMQ中的AMQP-0-9-1模型,那么这篇文章应该会比较容易理解。
引入依赖
先确认已经安装了RabbitMQ的服务,并且开启了Web管理插件,方便直接从Web管理界面查找到队列、交换器和绑定。个人有软件洁癖,喜欢把软件和依赖保持升级到最新版本。引入RabbitMQ的Java驱动:
代码语言:javascript复制<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version>
</dependency>
本文介绍RabbitMQ通过其Java驱动声明队列、交换器和绑定。对于队列和交换器,其首次声明也是创建的操作。队列、交换器和绑定的声明依赖于通道(Channel),对应的是com.rabbitmq.client.Channel
接口。在使用RabbitMQ的Java驱动的时候,一般在我们都使用下面的方式进行组件的声明操作:
- 1、基于RabbitMQ连接信息构建
com.rabbitmq.client.ConnectionFactory
实例。 - 2、基于ConnectionFactory新建一个
com.rabbitmq.client.Connection
实例。 - 3、基于Connection新建一个
com.rabbitmq.client.Channel
实例。 - 4、通过Channel实例声明(删除、解除绑定)队列、交换器或者绑定(Channel实例是可以复用的)。
虽然Channel实例是可以复用的,但是为了简化测试方法的编写,我们可以写下个简单的基础类:
代码语言:javascript复制public abstract class BaseChannelFactory {
protected static void provideChannel(ChannelAction channelAction) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
try {
channelAction.doInChannel(channel);
} finally {
channel.close();
connection.close();
}
}
interface ChannelAction {
void doInChannel(Channel channel) throws Exception;
}
}
这样子,每次调用都是新建的Connection和Channel,我们只需要重点关注ChannelAction
的实现即可。
在查看一下框架类型的API文档的时候有个很重要的技巧:如果提供的方法有重载,只需要看参数最多的基础方法。
队列的相关操作
队列的相关参数主要包括队列的声明(declare)、删除(delete)和清空队列消息(purge)。
队列的声明
队列的声明依赖于com.rabbitmq.client.Channel
的queueDeclare
方法。queueDeclare
方法的多个重载都是同步方法,提供同样功能的还有一个异步方法queueDeclareNoWait
。下面选取queueDeclare
参数列表长度最大的方法进行分析:
Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
参数如下:
- queue:需要声明的队列名称。
- durable:是否开启持久化特性,如果此属性为true,消息中间件代理重启后队列会被重新声明(也就是说不会被删除),注意这个特性和消息的持久化特性完全不相关。
- exclusive:是否独占的,如果此属性为true,则队列的存在性绑定在创建它的连接上,意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
- autoDelete:是否自动删除,如果此属性为true,意味着队列不被使用的时候会被消息中间件代理删除,实际上意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
- arguments:K-V结构,队列参数,一般和消息中间件代理或者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等,后面会有专门文章介绍这些特性。
队列声明的返回值是Queue.DeclareOk
实例:
public interface DeclareOk extends Method {
String getQueue();
int getMessageCount();
int getConsumerCount();
//...
}
可以从中得知声明的队列名、队列中的消息数量、队列当前的消费者数量,这个返回值对于无参数的queueDeclare
方法十分有意义:
Queue.DeclareOk queueDeclare() throws IOException;
因为这个方法声明的队列名称是由消息中间件代理随机生成,队列名就是通过返回值告知客户端的。这里贴一个简单的例子:
代码语言:javascript复制public class QueueDeclareMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("throwable.queue", true, false, false, null);
System.out.println(declareOk.getQueue());
});
}
}
运行后控制台打印throwable.queue说明队列声明成功,可以查看RabbitMQ的Web管理界面:
可见队列的确已经被创建,但是Bindings一栏显示队列只绑定到默认的交换器中,这个时候其实已经可以通过默认的交换器向队列中发送消息。队列声明失败的时候会直接抛出异常,一般是IOException。上面的例子中是我们最常见到的队列声明方式,声明出来的队列开启了队列持久化特性、非独占的、非自动删除的,也就是即使RabbitMQ服务重启了,队列依然会存在(被重新声明),但是并不是所有的场景都需要这种声明方式(说实话,目前笔者没碰到不使用这种声明方式的场景,有点惭愧)。还有一点需要重点关注:队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
队列的被动(Passive)声明
队列的被动声明,其实是检查队列在消息代理中间件是否存在的判断方法,依赖于Channel的queueDeclarePassive
方法:
Queue.DeleteOk queueDelete(String queue) throws IOException;
它只依赖于一个参数-队列名称,如果队列名称对应的队列已经存在,则返回Queue.DeleteOk
实例,如果队列不存在,会抛出IOException,通常是一个被包装为IOException的ShutdownSignalException,而ShutdownSignalException是运行时异常的子类。举个列子:
public class QueueDeclarePassiveMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//队列throwable.queue已存在
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("throwable.queue");
System.out.println(declareOk.getQueue());
//队列throwable.queue.passive不存在
declareOk = channel.queueDeclarePassive("throwable.queue.passive");
System.out.println(declareOk.getQueue());
});
}
}
由于throwable.queue.passive
队列不存在,因此会抛出IOException,追踪异常栈查看底层的异常是:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue ‘throwable.queue.passive’ in vhost ‘/’, class-id=50, method-id=10)
利用这个方法的特性可以尝试编写一个方法确认队列是否存在,例如:
代码语言:javascript复制private static boolean isQueueExisted(Channel channel, String queueName) {
boolean flag = false;
try {
channel.queuePurge(queueName);
flag = true;
} catch (IOException e) {
//no-op
}
return flag;
}
队列的删除
队列的删除对应的是Channel的queueDelete
方法:
//基于队列名删除队列,不关注队列是否被使用,也不关注队列中是否存在消息
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
参数如下:
- queue:队列名称。
- ifUnused:判断队列是否被不被使用,如果为true,只有不被使用的队列才能被删除。
- ifEmpty:判断队列是否为空(没有消息积压),如果为true,只有空的队列才能被删除。
其实也就是只依赖队列名的单参数的queueDelete
就是强制删除方法,举个例子:
public class QueueDeleteMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.DeleteOk deleteOk = channel.queueDelete("throwable.queue");
System.out.println(String.format("Delete queue [%s] successfully,message count = %d!",
"throwable.queue", deleteOk.getMessageCount()));
});
}
}
一般来说,队列的删除功能权限最好不要下放到应用程序中,除非有特殊的需要,如果需要删除队列,最好使用queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
方法,否则有可能造成消息丢失。
队列的清空
队列的清空操作会删除队列中的所有消息,依赖的方法是Channel的queuePurge
方法:
Queue.PurgeOk queuePurge(String queue) throws IOException;
此方法会基于队列名清除对应队列中的所有内容,使用的时候需要谨慎,举个例子:
代码语言:javascript复制public class QueuePurgeMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.PurgeOk purgeOk = channel.queuePurge("throwable.queue");
System.out.println(String.format("Purge queue [%s] successfully,message count = %d!",
"throwable.queue", purgeOk.getMessageCount()));
});
}
}
其实在Web管理界面中,每个队列所在的页面下有一个Purge
按钮,该按钮的功能就是清空队列消息。
交换器的相关操作
交换器的相关操作主要包括交换器的声明和删除。
交换器的声明
交换器的声明方法依赖于Channel的exchangeDeclare
方法,按照惯例查看它重载方法中参数列表长度最大的方法:
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
参数解释如下:
- exchange:交换器名称。
- type:type可以为字符串或者BuiltinExchangeType类型,BuiltinExchangeType枚举只包括DIRECT、FANOUT、TOPIC和HEADERS,而字符串类型除了定义四种内建类型的交换器,实际上RabbitMQ允许自定义类型的交换器,不过很少使用。
- durable:是否开启持久化特性,如果此属性为true,则消息中间件代理重启后,交换器不会删除,实际上是会被重新声明一次。
- autoDelete:是否自动删除,如果此属性为true,当最后一个绑定到此交换器的队列解除绑定关系,交换器会被删除。
- internal:是否内部的,如果此属性为true,该交换器只允许消息中间件代理使用,客户端无法使用。
- arguments:交换器参数,K-V结构,参数一般和消息中间件代理或者插件的一些扩展特性相关,不依赖这些扩展特性直接使用null即可。
举个简单的使用例子:
代码语言:javascript复制public class ExchangeDeclareMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDeclare("throwable.exchange.direct",
BuiltinExchangeType.DIRECT, true, false, null);
});
}
}
执行完毕之后,RabbitMQ的Web管理器的Exchanges的选项卡中就能看到对应的交换器:
因为没有声明交换器和队列的绑定,所以Bindings一栏是空的。
交换器的被动声明
交换器的被动声明类似于队列的被动声明,用于通过交换器名称检查是否存在对应的交换器,依赖于Channel的exchangeDeclarePassive
方法:
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
举个例子:
代码语言:javascript复制public class ExchangeDeclarePassiveMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//throwable.exchange.direct存在
channel.exchangeDeclarePassive("throwable.exchange.direct");
//throwable.exchange.fanout不存在,会抛出IOException
channel.exchangeDeclarePassive("throwable.exchange.fanout");
});
}
}
类似可以写个检查交换器是否存在的工具方法:
代码语言:javascript复制private boolean isExchangeExisted(Channel channel, String exchangeName) {
boolean flag = false;
try {
channel.exchangeDeclarePassive(exchangeName);
flag = true;
} catch (IOException e) {
//no-op
}
return flag;
}
交换器的删除
交换器的删除依赖于Channel的exchangeDelete
方法,方法只依赖于交换器的名称。
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
举个例子:
代码语言:javascript复制public class ExchangeDeleteMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDelete("throwable.exchange.direct");
});
}
}
绑定的声明
前面提到队列的声明和交换器的声明,队列和交换器创建之后,需要声明两者的绑定关系,Channel中提供了两种声明绑定关系的方法:
queueBind
方法,声明队列和交换器的绑定关系。exchangeBind
方法,声明交换器和交换器之间的绑定关系。
同时也提供解除绑定的方法:
queueUnbind
方法:解除队列和交换器的绑定关系。exchangeUnbind
方法:解除交换器之间的绑定关系。
队列和交换器的绑定和解绑
队列和交换器的绑定主要依赖于Channel的queueBind
,而解绑主要依赖于queueUnbind
方法,按照惯例看这两个方法重载方法中参数列表长度最大的方法:
Queue.BindOk queueBind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
Queue.UnbindOk queueUnbind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
注意这两个方法的参数列表完全一致:
- queue:队列名称。
- exchange:交换器名称。
- routingKey:路由键。
- arguments:绑定参数,K-V结构,参数一般和消息中间件代理或者插件的一些扩展特性相关,不依赖这些扩展特性直接使用null即可。
基于声明队列和交换器间的绑定举个例子:
代码语言:javascript复制public class QueueBindMain extends BaseChannelFactory{
public static void main(String[] args) throws Exception{
provideChannel(channel -> {
//throwable.exchange.direct->throwable.queue
channel.queueBind("throwable.queue","throwable.exchange.direct", "throwable.routingKey",null);
});
}
}
声明成功之后,可以查看对应的队列和交换器中的Bindings一栏:
可见交换器和队列成功建立了绑定关系。接着可以尝试使用解绑方法进行绑定解除:
代码语言:javascript复制public class QueueUnbindMain extends BaseChannelFactory{
public static void main(String[] args) throws Exception{
provideChannel(channel -> {
channel.queueUnbind("throwable.queue","throwable.exchange.direct", "throwable.routingKey",null);
});
}
}
交换器之间的绑定和解绑
RabbitMQ中支持两个不同的交换器之间进行绑定和解除绑定,绑定方法依赖于Channel的exchangeBind
方法,解除绑定依赖于Channel的exchangeUnbind
方法:
Exchange.BindOk exchangeBind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
参数如下:
- destination:目标交换器名称。
- source:来源交换器名称。
- routingKey:路由键。
- arguments:参数,K-V结构。
我们先预先建立一个Fanout类型的交换器,命名为throwable.exchange.fanout
,接着,我们把Fanout类型的交换器throwable.exchange.fanout
作为来源交换器,绑定到Direct类型的目标交换器throwable.exchange.direct
上:
public class ExchangeBindMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDeclare("throwable.exchange.fanout", BuiltinExchangeType.FANOUT, true, false, null);
channel.exchangeBind("throwable.exchange.direct", "throwable.exchange.fanout", "exchange.routingKey");
});
}
}
现在可以查看交换器throwable.exchange.direct
的Bindings一栏的信息:
这就是现在我们通过交换器throwable.exchange.fanout
发送消息,消息会先到达交换器throwable.exchange.direct
,然后再路由到队列throwable.queue
中。多重绑定的交换器在一些复杂的场景有重要的作用,但是目前来看还没有碰到使用场景(一般来说,存在即合理)。
接着举个例子进行交换器之间的绑定解除:
代码语言:javascript复制public class ExchangeUnbindMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeUnbind("throwable.exchange.direct", "throwable.exchange.fanout", "exchange.routingKey");
});
}
}
小结
一旦队列和交换器之间的绑定关系声明完毕,我们可以通过交换器和可选的路由键向队列中发送消息,可以注册消费者到队列中获取消息。RabbitMQ中的队列、交换器和绑定有个特点:组件的声明只承认第一次,也就是队列名、交换器名是唯一的,组件可以反复声明,不过声明所使用的参数必须和首次声明的参数一致。
(本文完 c-3-d e-a-20181125)
本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650060