对activemq消息订阅模式来说有两种:持久订阅/非持久订阅。
非持久订阅consumer只能消费在该consumer激活状态时传送给对应topic的消息才能被该consumer消费,一旦该consumer 挂掉到下次启动期间发布到该topic的消息不能被该consumer重新恢复时使用!!!
持久订阅:订阅之后,无论消息是否是在该consumer激活或者down掉期间发送的,最终都会被该consumer接收到,直到被显示取消持久订阅(session.unscribe(“topic名字”))!!!
那么持久订阅到底是如何实现的呢,笔者在这里将展现其中的奥秘:
先来看下TopicRegion的addConsumer方法
代码语言:javascript复制 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
//看该消息是否是持久化订阅
ActiveMQDestination destination = info.getDestination();
if (!destination.isPattern()) {
// Make sure the destination is created.
lookup(context, destination,true);
}
String clientId = context.getClientId();
String subscriptionName = info.getSubscriptionName();
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub != null) {
if (sub.isActive()) {
throw new JMSException("Durable consumer is in use for client: " clientId " and subscriptionName: " subscriptionName);
}
// 看下该订阅者的消息筛选项是否变化
if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
// 如果变化了那么首先移除该订阅者对应的DurableTopicSubscription,然后再追加最新创建的DurableTopicSubscription
durableSubscriptions.remove(key);
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
} finally {
destinationsLock.readLock().unlock();
}
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
sub = durableSubscriptions.get(key);
} else {
// 如果消息筛选项没有变化,那么直接将刚恢复连接的订阅者id与之前的DurableTopicSubscription 关联起来
if (sub.getConsumerInfo().getConsumerId() != null) {
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
}
subscriptions.put(info.getConsumerId(), sub);
}
} else {
super.addConsumer(context, info);
sub = durableSubscriptions.get(key);
if (sub == null) {
throw new JMSException("Cannot use the same consumerId: " info.getConsumerId() " for two different durable subscriptions clientID: " key.getClientId()
" subscriberName: " key.getSubscriptionName());
}
}
sub.activate(usageManager, context, info, broker);
return sub;
} else {
return super.addConsumer(context, info);
}
}
上面代码是订阅者连接到消息提供者时的处理代码,下面看下更核心的持久订阅与消息提供者断开连接时的处理:
代码语言:javascript复制 @Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub != null) {
sub.deactivate(keepDurableSubsActive);
}
} else {
super.removeConsumer(context, info);
}
}
从上面代码可以看到,针对持久订阅者来说,当其与消息提供者断开连接时,provider并没有将该连接移除,仅仅是将断开连接者对应的DurableTopicSubscription状态设置为非激活状态,改状态不影响provider将发送到该topic的消息保存下来,非持久订阅者则在与provider失去连接这段期间无法接收该时间段发送的消息!